Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 22 additions & 27 deletions lib/paf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -249,20 +249,24 @@ end = struct
let rec really_recv flow ~read ~read_eof =
Log.debug (fun m -> m "start to really [`read].") ;
Mimic.read flow.flow >>= function
| Error err ->
Log.err (fun m -> m "[`read] got an error: %a." Mimic.pp_error err) ;
flow.rd_closed <- true ;
safely_close flow >>= fun () -> Lwt.return `Closed
| Ok `Eof ->
let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in
Log.debug (fun m -> m "[`read] Connection closed.") ;
flow.rd_closed <- true ;
safely_close flow >>= fun () -> Lwt.return `Closed
| Ok (`Data v) ->
let len = Cstruct.len v in
Log.debug (fun m -> m "<- %d byte(s)" len) ;
Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v ;
recv flow ~read ~read_eof
| Ok `Eof | Error _ -> (
Ke.Rke.compress flow.queue ;
match Ke.Rke.N.peek flow.queue with
| [] ->
let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in
Log.debug (fun m -> m "[`read] Connection closed.") ;
flow.rd_closed <- true ;
safely_close flow >>= fun () -> Lwt.return `Closed
| [ slice ] ->
let _ = read_eof slice ~off:0 ~len:(Bigstringaf.length slice) in
flow.rd_closed <- true ;
safely_close flow >>= fun () -> Lwt.return `Closed
| _ -> assert false)

and recv flow ~read ~read_eof =
match Ke.Rke.N.peek flow.queue with
Expand All @@ -279,9 +283,7 @@ end = struct
Log.debug (fun m -> m "[`read] shift %d/%d byte(s)" shift len) ;
Ke.Rke.N.shift_exn flow.queue shift ;
if shift = 0
then (
Ke.Rke.compress flow.queue ;
really_recv flow ~read ~read_eof)
then really_recv flow ~read ~read_eof
else Lwt.return `Continue

let drain flow ~read:_ ~read_eof =
Expand Down Expand Up @@ -330,17 +332,11 @@ end = struct
let rec rd_loop () =
let rec go () =
match Runtime.next_read_operation connection with
| `Read -> (
| `Read ->
Log.debug (fun m -> m "[`read] start to read.") ;
let read = Runtime.read connection in
let read_eof = Runtime.read_eof connection in
recv flow ~read ~read_eof >>= function
| `Closed ->
Log.err (fun m -> m "[`read] connection was closed by peer.") ;
Lwt.wakeup_later notify_read_loop_exited () ;
flow.rd_closed <- true ;
safely_close flow
| _ -> go ())
recv flow ~read ~read_eof >>= fun _ -> go ()
| `Yield ->
Log.debug (fun m -> m "next read operation: `yield") ;
Runtime.yield_reader connection rd_loop ;
Expand Down Expand Up @@ -415,8 +411,6 @@ let service connection accept close = Service { accept; connection; close }
open Rresult
open Lwt.Infix

let ( >>? ) = Lwt_result.bind

let serve_when_ready :
type t flow.
(t, flow, _) posix ->
Expand All @@ -434,16 +428,17 @@ let serve_when_ready :
Lwt.return_unit) ;
t in
let rec loop () =
let accept = accept t >>? fun flow -> Lwt.return_ok (`Flow flow) in
accept >>? function
| `Flow flow ->
accept t >>= function
| Ok flow ->
Lwt.async (fun () -> handler flow) ;
Lwt.pause () >>= loop in
Lwt.pause () >>= loop
| Error `Closed -> Lwt.return_error `Closed
| Error _ -> Lwt.pause () >>= loop in
let stop_result =
Lwt.pick [ switched_off; loop () ] >>= function
| Ok `Stopped -> close t >>= fun () -> Lwt.return_ok ()
| Error _ as err -> close t >>= fun () -> Lwt.return err in
stop_result >>= function Ok () | Error (`Closed | _) -> Lwt.return_unit)
stop_result >>= function Ok () | Error `Closed -> Lwt.return_unit)

let server : type t. t runtime -> sleep:sleep -> t -> Mimic.flow -> unit Lwt.t =
fun (module Runtime) ~sleep conn flow ->
Expand Down
24 changes: 18 additions & 6 deletions lib/paf_mirage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ module Make (Time : Mirage_time.S) (Stack : Mirage_stack.V4V6) :
Stack.TCP.create_connection t (ipaddr, port) >>= function
| Error err ->
Log.err (fun m ->
m "Got an error when we try to connect (TCP) to %a:%d: %a"
m
"Got an error when we try to connect to the server (TLS) to \
%a:%d: %a"
Ipaddr.pp ipaddr port Stack.TCP.pp_error err) ;
Lwt.return_error (`Read err)
| Ok flow ->
Expand Down Expand Up @@ -181,9 +183,9 @@ module Make (Time : Mirage_time.S) (Stack : Mirage_stack.V4V6) :
Lwt_mutex.unlock mutex ;
accept t)

let close ({ stack; condition; _ } as t) =
let close ({ condition; _ } as t) =
t.closed <- true ;
Stack.disconnect stack >>= fun () ->
(* Stack.disconnect stack >>= fun () -> *)
Lwt_condition.signal condition () ;
Lwt.return_unit

Expand All @@ -206,11 +208,21 @@ module Make (Time : Mirage_time.S) (Stack : Mirage_stack.V4V6) :
accept t >>= function
| Error _ as err -> Lwt.return err
| Ok flow -> (
let dst = Stack.TCP.dst flow in
let ((ipaddr, port) as dst) = Stack.TCP.dst flow in
TLS.server_of_flow tls flow >>= function
| Ok flow -> Lwt.return_ok (dst, flow)
| Error _ as err -> Stack.TCP.close flow >>= fun () -> Lwt.return err)
in
| Error `Closed ->
(* XXX(dinosaure): be care! [`Closed] at this stage does not mean
* that the bound socket is closed but the socket with the peer is
* closed. *)
Lwt.return_error (`Write `Closed)
| Error err ->
Log.err (fun m ->
m
"Got an error when we try to connect to the client (TLS) \
to %a:%d: %a"
Ipaddr.pp ipaddr port TLS.pp_write_error err) ;
Stack.TCP.close flow >>= fun () -> Lwt.return_error err) in
let connection (dst, flow) =
let error_handler = error_handler dst in
let request_handler = request_handler dst in
Expand Down