diff --git a/lib/paf.ml b/lib/paf.ml index f261c84..10f40a1 100644 --- a/lib/paf.ml +++ b/lib/paf.ml @@ -64,70 +64,84 @@ type sleep = int64 -> unit Lwt.t type 'conn runtime = (module RUNTIME with type t = 'conn) -exception Flow of Mimic.error - -module Server (Runtime : RUNTIME) : sig - val server : sleep:sleep -> Runtime.t -> Mimic.flow -> unit Lwt.t -end = struct - let src = Logs.Src.create "paf" +module Make (Flow : Mirage_flow.S) = struct + let src = Logs.Src.create "paf-flow" module Log = (val Logs.src_log src : Logs.LOG) - type server = { - flow : Mimic.flow; + type flow = { + flow : Flow.flow; sleep : sleep; queue : (char, Bigarray.int8_unsigned_elt) Ke.Rke.t; mutable rd_closed : bool; mutable wr_closed : bool; } - open Rresult - open Lwt.Infix + let create ~sleep flow = + let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in + Lwt.return { flow; sleep; queue; rd_closed = false; wr_closed = false } let safely_close flow = if flow.rd_closed && flow.wr_closed then ( Log.debug (fun m -> m "Close the connection.") ; - Mimic.close flow.flow) + Flow.close flow.flow) else Lwt.return () let blit src src_off dst dst_off len = let dst = Cstruct.of_bigarray ~off:dst_off ~len dst in Cstruct.blit src src_off dst 0 len - let rec recv flow ~read ~read_eof = - match Ke.Rke.N.peek flow.queue with - | [] -> ( - if flow.rd_closed - then - let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in - Lwt.return `Closed - else - Mimic.read flow.flow >>= function - | Error (#Mimic.error as err) -> - flow.rd_closed <- true ; - safely_close flow >>= fun () -> raise (Flow err) - | Ok `Eof -> - let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in - Log.debug (fun m -> m "[`read] Connection closed.") ; - flow.rd_closed <- true ; - safely_close flow >>= fun () -> Lwt.return `Closed - | Ok (`Data v) -> - let len = Cstruct.len v in - Log.debug (fun m -> m "<- %d byte(s)" len) ; - let _ = - Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v - in - recv flow ~read ~read_eof) - | src :: _ -> - let len = Bigstringaf.length src in - let shift = read src ~off:0 ~len in - Log.debug (fun m -> m "[`read] shift %d/%d byte(s)" shift len) ; + open Lwt.Infix + + let recv flow ~read ~read_eof = + (* match Ke.Rke.N.peek flow.queue with + | src :: _ -> + let len = Bigstringaf.length src in + let shift = read src ~off:0 ~len in + Ke.Rke.N.shift_exn flow.queue shift ; + Lwt.return `Continue + | [] when flow.rd_closed -> + let _ = read_eof Bigstringaf.empty ~off:0 ~len:0 in + Lwt.return `Closed + | [] -> *) + Ke.Rke.compress flow.queue ; + Flow.read flow.flow >>= function + | Error _ | Ok `Eof -> + flow.rd_closed <- true ; + safely_close flow >>= fun () -> + let _shift = + match + Ke.Rke.compress flow.queue ; + Ke.Rke.N.peek flow.queue + with + | [] -> read_eof Bigstringaf.empty ~off:0 ~len:0 + | [ slice ] -> read_eof slice ~off:0 ~len:(Bigstringaf.length slice) + | _ -> assert false + (* XXX(dinosaure): impossible due to [compress]. *) in + Lwt.return `Closed + | Ok (`Data v) -> + let len = Cstruct.len v in + Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v ; + let[@warning "-8"] (slice :: _) = Ke.Rke.N.peek flow.queue in + let shift = read slice ~off:0 ~len:(Bigstringaf.length slice) in Ke.Rke.N.shift_exn flow.queue shift ; - if shift = 0 then Ke.Rke.compress flow.queue ; Lwt.return `Continue - - let sleep (flow : server) timeout = + (* XXX(dinosaure): semantically, this is the closer impl. of [recv] if we + * compare with HTTP/AF. [compress] is called before any [read] and it ensures + * some assumptions needed by HTTP/AF (or Angstrom) to parse requests. + * + * Indeed, without [compress] at the beginning, it seems that HTTP/AF is not + * able to decide to close the connection. + * + * On the other side, introspect [flow.queue] before and gives slices and limit + * calls to [read] can finish to a situation with ["\r\n"] into the queue and + * HTTP/AF is not able to shift nor to finalize. + * + * In others words, [compress] seems the key to ensure that we deliver something + * good for HTTP/AF to terminate or not the connection properly. *) + + let sleep flow timeout = flow.sleep timeout >>= fun () -> Lwt.return (Error `Closed) let writev ?(timeout = 5_000_000_000L) flow iovecs = @@ -135,8 +149,7 @@ end = struct | [] -> Lwt.return (`Ok acc) | { Faraday.buffer; off; len } :: rest -> ( let raw = Cstruct.of_bigarray buffer ~off ~len in - Lwt.pick [ Mimic.write flow.flow raw; sleep flow timeout ] - >>= function + Lwt.pick [ Flow.write flow.flow raw; sleep flow timeout ] >>= function | Ok () -> go (acc + len) rest | Error `Closed -> flow.wr_closed <- true ; @@ -155,11 +168,21 @@ end = struct | _ -> flow.rd_closed <- true ; flow.wr_closed <- true ; - Mimic.close flow.flow + Flow.close flow.flow +end + +module Server (Flow : Mirage_flow.S) (Runtime : RUNTIME) : sig + val server : sleep:sleep -> Runtime.t -> Flow.flow -> unit Lwt.t +end = struct + let src = Logs.Src.create "paf-server" + + module Log = (val Logs.src_log src : Logs.LOG) + + module Easy_flow = Make (Flow) + open Lwt.Infix let server ~sleep connection flow = - let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in - let flow = { flow; sleep; queue; rd_closed = false; wr_closed = false } in + Easy_flow.create ~sleep flow >>= fun flow -> let rd_exit, notify_rd_exit = Lwt.wait () in let wr_exit, notify_wr_exit = Lwt.wait () in let rec rd_fiber () = @@ -167,7 +190,7 @@ end = struct match Runtime.next_read_operation connection with | `Read -> Log.debug (fun m -> m "next read operation: `read") ; - recv flow ~read:(Runtime.read connection) + Easy_flow.recv flow ~read:(Runtime.read connection) ~read_eof:(Runtime.read_eof connection) >>= fun _ -> go () | `Yield -> @@ -177,8 +200,8 @@ end = struct | `Close -> Log.debug (fun m -> m "next read operation: `close") ; Lwt.wakeup_later notify_rd_exit () ; - flow.rd_closed <- true ; - safely_close flow in + flow.Easy_flow.rd_closed <- true ; + Easy_flow.safely_close flow in Lwt.async @@ fun () -> Lwt.catch go (fun exn -> Runtime.report_exn connection exn ; @@ -188,7 +211,7 @@ end = struct match Runtime.next_write_operation connection with | `Write iovecs -> Log.debug (fun m -> m "next write operation: `write") ; - send flow iovecs >>= fun res -> + Easy_flow.send flow iovecs >>= fun res -> Runtime.report_write_result connection res ; go () | `Yield -> @@ -198,8 +221,8 @@ end = struct | `Close _ -> Log.debug (fun m -> m "next write operation: `close") ; Lwt.wakeup_later notify_wr_exit () ; - flow.wr_closed <- true ; - safely_close flow in + flow.Easy_flow.wr_closed <- true ; + Easy_flow.safely_close flow in Lwt.async @@ fun () -> Lwt.catch go (fun exn -> (* Runtime.report_write_result connection `Closed ; *) @@ -209,11 +232,11 @@ end = struct wr_fiber () ; Lwt.join [ rd_exit; wr_exit ] >>= fun () -> Log.debug (fun m -> m "End of transmission.") ; - close flow + Easy_flow.close flow end -module Client (Runtime : RUNTIME) : sig - val run : sleep:sleep -> Runtime.t -> Mimic.flow -> unit Lwt.t +module Client (Flow : Mirage_flow.S) (Runtime : RUNTIME) : sig + val run : sleep:sleep -> Runtime.t -> Flow.flow -> unit Lwt.t end = struct open Lwt.Infix @@ -221,115 +244,10 @@ end = struct module Log = (val Logs.src_log src : Logs.LOG) - let blit src src_off dst dst_off len = - let dst = Cstruct.of_bigarray ~off:dst_off ~len dst in - Cstruct.blit src src_off dst 0 len - - type client = { - flow : Mimic.flow; - sleep : sleep; - queue : (char, Bigarray.int8_unsigned_elt) Ke.Rke.t; - mutable rd_closed : bool; - mutable wr_closed : bool; - } - - let safely_close flow = - if flow.rd_closed && flow.wr_closed - then ( - Log.debug (fun m -> m "Close the connection.") ; - Mimic.close flow.flow) - else Lwt.return () - - let recv flow ~read ~read_eof = - Mimic.read flow.flow >>= function - | Error _ | Ok `Eof -> - flow.rd_closed <- true ; - safely_close flow >>= fun () -> - let _shift = - match - Ke.Rke.compress flow.queue ; - Ke.Rke.N.peek flow.queue - with - | [] -> read_eof Bigstringaf.empty ~off:0 ~len:0 - | [ slice ] -> read_eof slice ~off:0 ~len:(Bigstringaf.length slice) - | _ -> assert false - (* XXX(dinosaure): impossible due to [compress]. *) in - Lwt.return `Closed - | Ok (`Data v) -> - let len = Cstruct.len v in - Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v ; - let[@warning "-8"] (slice :: _) = Ke.Rke.N.peek flow.queue in - let shift = read slice ~off:0 ~len:(Bigstringaf.length slice) in - Ke.Rke.N.shift_exn flow.queue shift ; - Lwt.return `Continue - - (* - let rec recv flow ~read ~read_eof = - match Ke.Rke.N.peek flow.queue with - | [] -> ( - if flow.rd_closed - then - let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in - Lwt.return `Closed - else - Mimic.read flow.flow >>= function - | Error (#Mimic.error as err) -> - flow.rd_closed <- true ; - safely_close flow >>= fun () -> raise (Flow err) - | Ok `Eof -> - let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in - Log.debug (fun m -> m "[`read] Connection closed.") ; - flow.rd_closed <- true ; - safely_close flow >>= fun () -> Lwt.return `Closed - | Ok (`Data v) -> - let len = Cstruct.len v in - Log.debug (fun m -> m "<- %d byte(s)" len) ; - let _ = - Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v - in - recv flow ~read ~read_eof) - | src :: _ -> - let len = Bigstringaf.length src in - let shift = read src ~off:0 ~len in - Log.debug (fun m -> m "[`read] shift %d/%d byte(s)" shift len) ; - Ke.Rke.N.shift_exn flow.queue shift ; - if shift = 0 then Ke.Rke.compress flow.queue ; - Lwt.return `Continue - *) - - let sleep (flow : client) timeout = - flow.sleep timeout >>= fun () -> Lwt.return (Error `Closed) - - let writev ?(timeout = 5_000_000_000L) flow iovecs = - let rec go acc = function - | [] -> Lwt.return (`Ok acc) - | { Faraday.buffer; off; len } :: rest -> ( - let raw = Cstruct.of_bigarray buffer ~off ~len in - Lwt.pick [ Mimic.write flow.flow raw; sleep flow timeout ] - >>= function - | Ok () -> go (acc + len) rest - | Error `Closed -> - flow.wr_closed <- true ; - safely_close flow >>= fun () -> Lwt.return `Closed - | Error _ -> assert false) in - go 0 iovecs - - let send flow iovecs = - if flow.wr_closed - then safely_close flow >>= fun () -> Lwt.return `Closed - else writev flow iovecs - - let close flow = - match (flow.rd_closed, flow.wr_closed) with - | true, true -> Lwt.return_unit - | _ -> - flow.rd_closed <- true ; - flow.wr_closed <- true ; - Mimic.close flow.flow + module Easy_flow = Make (Flow) let run ~sleep connection flow = - let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in - let flow = { flow; sleep; queue; rd_closed = false; wr_closed = false } in + Easy_flow.create ~sleep flow >>= fun flow -> let rd_exit, notify_rd_exit = Lwt.wait () in let wr_exit, notify_wr_exit = Lwt.wait () in @@ -338,7 +256,7 @@ end = struct match Runtime.next_read_operation connection with | `Read -> Log.debug (fun m -> m "[`read] start to read.") ; - recv flow ~read:(Runtime.read connection) + Easy_flow.recv flow ~read:(Runtime.read connection) ~read_eof:(Runtime.read_eof connection) >>= fun _ -> go () | `Yield -> @@ -348,8 +266,8 @@ end = struct | `Close -> Log.debug (fun m -> m "[`read] close the connection.") ; Lwt.wakeup_later notify_rd_exit () ; - flow.rd_closed <- true ; - safely_close flow in + flow.Easy_flow.rd_closed <- true ; + Easy_flow.safely_close flow in Lwt.async @@ fun () -> Lwt.catch go (fun exn -> Runtime.report_exn connection exn ; @@ -359,7 +277,7 @@ end = struct match Runtime.next_write_operation connection with | `Write iovecs -> Log.debug (fun m -> m "[`write] start to write.") ; - send flow iovecs >>= fun res -> + Easy_flow.send flow iovecs >>= fun res -> Runtime.report_write_result connection res ; go () | `Yield -> @@ -377,7 +295,7 @@ end = struct Lwt.return ()) in wr_loop () ; rd_loop () ; - Lwt.join [ rd_exit; wr_exit ] >>= fun () -> close flow + Lwt.join [ rd_exit; wr_exit ] >>= fun () -> Easy_flow.close flow end type impl = Runtime : 'conn runtime * 'conn -> impl @@ -432,7 +350,7 @@ let serve_when_ready : let server : type t. t runtime -> sleep:sleep -> t -> Mimic.flow -> unit Lwt.t = fun (module Runtime) ~sleep conn flow -> - let module Server = Server (Runtime) in + let module Server = Server (Mimic) (Runtime) in Server.server ~sleep conn flow let serve ~sleep ?stop service t = @@ -445,5 +363,5 @@ let serve ~sleep ?stop service t = let run : type t. t runtime -> sleep:sleep -> t -> Mimic.flow -> unit Lwt.t = fun (module Runtime) ~sleep conn flow -> - let module Client = Client (Runtime) in + let module Client = Client (Mimic) (Runtime) in Client.run ~sleep conn flow diff --git a/lib/paf.mli b/lib/paf.mli index ed0528a..f057936 100644 --- a/lib/paf.mli +++ b/lib/paf.mli @@ -60,8 +60,6 @@ module type RUNTIME = sig val shutdown : t -> unit end -exception Flow of Mimic.error - type 'conn runtime = (module RUNTIME with type t = 'conn) type impl = Runtime : 'conn runtime * 'conn -> impl diff --git a/test/simple_client.ml b/test/simple_client.ml index b1218dd..6ead066 100644 --- a/test/simple_client.ml +++ b/test/simple_client.ml @@ -60,8 +60,6 @@ let failf fmt = Format.kasprintf (fun err -> raise (Failure err)) fmt let error_handler wk _ (err : Alpn.client_error) = Lwt.wakeup_later wk (err :> [ `Body of string | `Done | Alpn.client_error ]) ; match err with - | `Exn (Paf.Flow err) -> - failf "Impossible to start a transmission: %a" Mimic.pp_error err | `Invalid_response_body_length_v1 _ | `Invalid_response_body_length_v2 _ -> failf "Invalid response body-length" | `Malformed_response _ -> failf "Malformed response" diff --git a/test/simple_server.ml b/test/simple_server.ml index ce63ed7..700633a 100644 --- a/test/simple_server.ml +++ b/test/simple_server.ml @@ -118,19 +118,8 @@ let request_handler large (ip, port) reqd = http_large large (ip, port) (Reqd.request_body reqd) oc | _ -> assert false -let error_handler (ip, port) ?request:_ error respond = - let open Httpaf in +let error_handler _ ?request:_ error _respond = match error with - | `Exn (Paf.Flow err) -> - let contents = - Fmt.strf "Internal server error from <%a:%d>: %a" Ipaddr.pp ip port - Mimic.pp_error err in - let headers = - Headers.of_list - [ ("content-length", string_of_int (String.length contents)) ] in - let body = respond headers in - Body.write_string body contents ; - Body.close_writer body | `Exn _exn -> Printexc.print_backtrace stderr | `Bad_gateway -> Fmt.epr "Got a bad gateway error.\n%!" | `Bad_request -> Fmt.epr "Got a bad request error.\n%!"