Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 88 additions & 170 deletions lib/paf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,79 +64,92 @@ type sleep = int64 -> unit Lwt.t

type 'conn runtime = (module RUNTIME with type t = 'conn)

exception Flow of Mimic.error

module Server (Runtime : RUNTIME) : sig
val server : sleep:sleep -> Runtime.t -> Mimic.flow -> unit Lwt.t
end = struct
let src = Logs.Src.create "paf"
module Make (Flow : Mirage_flow.S) = struct
let src = Logs.Src.create "paf-flow"

module Log = (val Logs.src_log src : Logs.LOG)

type server = {
flow : Mimic.flow;
type flow = {
flow : Flow.flow;
sleep : sleep;
queue : (char, Bigarray.int8_unsigned_elt) Ke.Rke.t;
mutable rd_closed : bool;
mutable wr_closed : bool;
}

open Rresult
open Lwt.Infix
let create ~sleep flow =
let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in
Lwt.return { flow; sleep; queue; rd_closed = false; wr_closed = false }

let safely_close flow =
if flow.rd_closed && flow.wr_closed
then (
Log.debug (fun m -> m "Close the connection.") ;
Mimic.close flow.flow)
Flow.close flow.flow)
else Lwt.return ()

let blit src src_off dst dst_off len =
let dst = Cstruct.of_bigarray ~off:dst_off ~len dst in
Cstruct.blit src src_off dst 0 len

let rec recv flow ~read ~read_eof =
match Ke.Rke.N.peek flow.queue with
| [] -> (
if flow.rd_closed
then
let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in
Lwt.return `Closed
else
Mimic.read flow.flow >>= function
| Error (#Mimic.error as err) ->
flow.rd_closed <- true ;
safely_close flow >>= fun () -> raise (Flow err)
| Ok `Eof ->
let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in
Log.debug (fun m -> m "[`read] Connection closed.") ;
flow.rd_closed <- true ;
safely_close flow >>= fun () -> Lwt.return `Closed
| Ok (`Data v) ->
let len = Cstruct.len v in
Log.debug (fun m -> m "<- %d byte(s)" len) ;
let _ =
Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v
in
recv flow ~read ~read_eof)
| src :: _ ->
let len = Bigstringaf.length src in
let shift = read src ~off:0 ~len in
Log.debug (fun m -> m "[`read] shift %d/%d byte(s)" shift len) ;
open Lwt.Infix

let recv flow ~read ~read_eof =
(* match Ke.Rke.N.peek flow.queue with
| src :: _ ->
let len = Bigstringaf.length src in
let shift = read src ~off:0 ~len in
Ke.Rke.N.shift_exn flow.queue shift ;
Lwt.return `Continue
| [] when flow.rd_closed ->
let _ = read_eof Bigstringaf.empty ~off:0 ~len:0 in
Lwt.return `Closed
| [] -> *)
Ke.Rke.compress flow.queue ;
Flow.read flow.flow >>= function
| Error _ | Ok `Eof ->
flow.rd_closed <- true ;
safely_close flow >>= fun () ->
let _shift =
match
Ke.Rke.compress flow.queue ;
Ke.Rke.N.peek flow.queue
with
| [] -> read_eof Bigstringaf.empty ~off:0 ~len:0
| [ slice ] -> read_eof slice ~off:0 ~len:(Bigstringaf.length slice)
| _ -> assert false
(* XXX(dinosaure): impossible due to [compress]. *) in
Lwt.return `Closed
| Ok (`Data v) ->
let len = Cstruct.len v in
Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v ;
let[@warning "-8"] (slice :: _) = Ke.Rke.N.peek flow.queue in
let shift = read slice ~off:0 ~len:(Bigstringaf.length slice) in
Ke.Rke.N.shift_exn flow.queue shift ;
if shift = 0 then Ke.Rke.compress flow.queue ;
Lwt.return `Continue

let sleep (flow : server) timeout =
(* XXX(dinosaure): semantically, this is the closer impl. of [recv] if we
* compare with HTTP/AF. [compress] is called before any [read] and it ensures
* some assumptions needed by HTTP/AF (or Angstrom) to parse requests.
*
* Indeed, without [compress] at the beginning, it seems that HTTP/AF is not
* able to decide to close the connection.
*
* On the other side, introspect [flow.queue] before and gives slices and limit
* calls to [read] can finish to a situation with ["\r\n"] into the queue and
* HTTP/AF is not able to shift nor to finalize.
*
* In others words, [compress] seems the key to ensure that we deliver something
* good for HTTP/AF to terminate or not the connection properly. *)

let sleep flow timeout =
flow.sleep timeout >>= fun () -> Lwt.return (Error `Closed)

let writev ?(timeout = 5_000_000_000L) flow iovecs =
let rec go acc = function
| [] -> Lwt.return (`Ok acc)
| { Faraday.buffer; off; len } :: rest -> (
let raw = Cstruct.of_bigarray buffer ~off ~len in
Lwt.pick [ Mimic.write flow.flow raw; sleep flow timeout ]
>>= function
Lwt.pick [ Flow.write flow.flow raw; sleep flow timeout ] >>= function
| Ok () -> go (acc + len) rest
| Error `Closed ->
flow.wr_closed <- true ;
Expand All @@ -155,19 +168,29 @@ end = struct
| _ ->
flow.rd_closed <- true ;
flow.wr_closed <- true ;
Mimic.close flow.flow
Flow.close flow.flow
end

module Server (Flow : Mirage_flow.S) (Runtime : RUNTIME) : sig
val server : sleep:sleep -> Runtime.t -> Flow.flow -> unit Lwt.t
end = struct
let src = Logs.Src.create "paf-server"

module Log = (val Logs.src_log src : Logs.LOG)

module Easy_flow = Make (Flow)
open Lwt.Infix

let server ~sleep connection flow =
let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in
let flow = { flow; sleep; queue; rd_closed = false; wr_closed = false } in
Easy_flow.create ~sleep flow >>= fun flow ->
let rd_exit, notify_rd_exit = Lwt.wait () in
let wr_exit, notify_wr_exit = Lwt.wait () in
let rec rd_fiber () =
let rec go () =
match Runtime.next_read_operation connection with
| `Read ->
Log.debug (fun m -> m "next read operation: `read") ;
recv flow ~read:(Runtime.read connection)
Easy_flow.recv flow ~read:(Runtime.read connection)
~read_eof:(Runtime.read_eof connection)
>>= fun _ -> go ()
| `Yield ->
Expand All @@ -177,8 +200,8 @@ end = struct
| `Close ->
Log.debug (fun m -> m "next read operation: `close") ;
Lwt.wakeup_later notify_rd_exit () ;
flow.rd_closed <- true ;
safely_close flow in
flow.Easy_flow.rd_closed <- true ;
Easy_flow.safely_close flow in
Lwt.async @@ fun () ->
Lwt.catch go (fun exn ->
Runtime.report_exn connection exn ;
Expand All @@ -188,7 +211,7 @@ end = struct
match Runtime.next_write_operation connection with
| `Write iovecs ->
Log.debug (fun m -> m "next write operation: `write") ;
send flow iovecs >>= fun res ->
Easy_flow.send flow iovecs >>= fun res ->
Runtime.report_write_result connection res ;
go ()
| `Yield ->
Expand All @@ -198,8 +221,8 @@ end = struct
| `Close _ ->
Log.debug (fun m -> m "next write operation: `close") ;
Lwt.wakeup_later notify_wr_exit () ;
flow.wr_closed <- true ;
safely_close flow in
flow.Easy_flow.wr_closed <- true ;
Easy_flow.safely_close flow in
Lwt.async @@ fun () ->
Lwt.catch go (fun exn ->
(* Runtime.report_write_result connection `Closed ; *)
Expand All @@ -209,127 +232,22 @@ end = struct
wr_fiber () ;
Lwt.join [ rd_exit; wr_exit ] >>= fun () ->
Log.debug (fun m -> m "End of transmission.") ;
close flow
Easy_flow.close flow
end

module Client (Runtime : RUNTIME) : sig
val run : sleep:sleep -> Runtime.t -> Mimic.flow -> unit Lwt.t
module Client (Flow : Mirage_flow.S) (Runtime : RUNTIME) : sig
val run : sleep:sleep -> Runtime.t -> Flow.flow -> unit Lwt.t
end = struct
open Lwt.Infix

let src = Logs.Src.create "paf"

module Log = (val Logs.src_log src : Logs.LOG)

let blit src src_off dst dst_off len =
let dst = Cstruct.of_bigarray ~off:dst_off ~len dst in
Cstruct.blit src src_off dst 0 len

type client = {
flow : Mimic.flow;
sleep : sleep;
queue : (char, Bigarray.int8_unsigned_elt) Ke.Rke.t;
mutable rd_closed : bool;
mutable wr_closed : bool;
}

let safely_close flow =
if flow.rd_closed && flow.wr_closed
then (
Log.debug (fun m -> m "Close the connection.") ;
Mimic.close flow.flow)
else Lwt.return ()

let recv flow ~read ~read_eof =
Mimic.read flow.flow >>= function
| Error _ | Ok `Eof ->
flow.rd_closed <- true ;
safely_close flow >>= fun () ->
let _shift =
match
Ke.Rke.compress flow.queue ;
Ke.Rke.N.peek flow.queue
with
| [] -> read_eof Bigstringaf.empty ~off:0 ~len:0
| [ slice ] -> read_eof slice ~off:0 ~len:(Bigstringaf.length slice)
| _ -> assert false
(* XXX(dinosaure): impossible due to [compress]. *) in
Lwt.return `Closed
| Ok (`Data v) ->
let len = Cstruct.len v in
Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v ;
let[@warning "-8"] (slice :: _) = Ke.Rke.N.peek flow.queue in
let shift = read slice ~off:0 ~len:(Bigstringaf.length slice) in
Ke.Rke.N.shift_exn flow.queue shift ;
Lwt.return `Continue

(*
let rec recv flow ~read ~read_eof =
match Ke.Rke.N.peek flow.queue with
| [] -> (
if flow.rd_closed
then
let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in
Lwt.return `Closed
else
Mimic.read flow.flow >>= function
| Error (#Mimic.error as err) ->
flow.rd_closed <- true ;
safely_close flow >>= fun () -> raise (Flow err)
| Ok `Eof ->
let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in
Log.debug (fun m -> m "[`read] Connection closed.") ;
flow.rd_closed <- true ;
safely_close flow >>= fun () -> Lwt.return `Closed
| Ok (`Data v) ->
let len = Cstruct.len v in
Log.debug (fun m -> m "<- %d byte(s)" len) ;
let _ =
Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v
in
recv flow ~read ~read_eof)
| src :: _ ->
let len = Bigstringaf.length src in
let shift = read src ~off:0 ~len in
Log.debug (fun m -> m "[`read] shift %d/%d byte(s)" shift len) ;
Ke.Rke.N.shift_exn flow.queue shift ;
if shift = 0 then Ke.Rke.compress flow.queue ;
Lwt.return `Continue
*)

let sleep (flow : client) timeout =
flow.sleep timeout >>= fun () -> Lwt.return (Error `Closed)

let writev ?(timeout = 5_000_000_000L) flow iovecs =
let rec go acc = function
| [] -> Lwt.return (`Ok acc)
| { Faraday.buffer; off; len } :: rest -> (
let raw = Cstruct.of_bigarray buffer ~off ~len in
Lwt.pick [ Mimic.write flow.flow raw; sleep flow timeout ]
>>= function
| Ok () -> go (acc + len) rest
| Error `Closed ->
flow.wr_closed <- true ;
safely_close flow >>= fun () -> Lwt.return `Closed
| Error _ -> assert false) in
go 0 iovecs

let send flow iovecs =
if flow.wr_closed
then safely_close flow >>= fun () -> Lwt.return `Closed
else writev flow iovecs

let close flow =
match (flow.rd_closed, flow.wr_closed) with
| true, true -> Lwt.return_unit
| _ ->
flow.rd_closed <- true ;
flow.wr_closed <- true ;
Mimic.close flow.flow
module Easy_flow = Make (Flow)

let run ~sleep connection flow =
let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in
let flow = { flow; sleep; queue; rd_closed = false; wr_closed = false } in
Easy_flow.create ~sleep flow >>= fun flow ->
let rd_exit, notify_rd_exit = Lwt.wait () in
let wr_exit, notify_wr_exit = Lwt.wait () in

Expand All @@ -338,7 +256,7 @@ end = struct
match Runtime.next_read_operation connection with
| `Read ->
Log.debug (fun m -> m "[`read] start to read.") ;
recv flow ~read:(Runtime.read connection)
Easy_flow.recv flow ~read:(Runtime.read connection)
~read_eof:(Runtime.read_eof connection)
>>= fun _ -> go ()
| `Yield ->
Expand All @@ -348,8 +266,8 @@ end = struct
| `Close ->
Log.debug (fun m -> m "[`read] close the connection.") ;
Lwt.wakeup_later notify_rd_exit () ;
flow.rd_closed <- true ;
safely_close flow in
flow.Easy_flow.rd_closed <- true ;
Easy_flow.safely_close flow in
Lwt.async @@ fun () ->
Lwt.catch go (fun exn ->
Runtime.report_exn connection exn ;
Expand All @@ -359,7 +277,7 @@ end = struct
match Runtime.next_write_operation connection with
| `Write iovecs ->
Log.debug (fun m -> m "[`write] start to write.") ;
send flow iovecs >>= fun res ->
Easy_flow.send flow iovecs >>= fun res ->
Runtime.report_write_result connection res ;
go ()
| `Yield ->
Expand All @@ -377,7 +295,7 @@ end = struct
Lwt.return ()) in
wr_loop () ;
rd_loop () ;
Lwt.join [ rd_exit; wr_exit ] >>= fun () -> close flow
Lwt.join [ rd_exit; wr_exit ] >>= fun () -> Easy_flow.close flow
end

type impl = Runtime : 'conn runtime * 'conn -> impl
Expand Down Expand Up @@ -432,7 +350,7 @@ let serve_when_ready :

let server : type t. t runtime -> sleep:sleep -> t -> Mimic.flow -> unit Lwt.t =
fun (module Runtime) ~sleep conn flow ->
let module Server = Server (Runtime) in
let module Server = Server (Mimic) (Runtime) in
Server.server ~sleep conn flow

let serve ~sleep ?stop service t =
Expand All @@ -445,5 +363,5 @@ let serve ~sleep ?stop service t =

let run : type t. t runtime -> sleep:sleep -> t -> Mimic.flow -> unit Lwt.t =
fun (module Runtime) ~sleep conn flow ->
let module Client = Client (Runtime) in
let module Client = Client (Mimic) (Runtime) in
Client.run ~sleep conn flow
2 changes: 0 additions & 2 deletions lib/paf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ module type RUNTIME = sig
val shutdown : t -> unit
end

exception Flow of Mimic.error

type 'conn runtime = (module RUNTIME with type t = 'conn)

type impl = Runtime : 'conn runtime * 'conn -> impl
Expand Down
Loading