diff --git a/lib/paf.ml b/lib/paf.ml index a012cde..6e3cc55 100644 --- a/lib/paf.ml +++ b/lib/paf.ml @@ -249,20 +249,24 @@ end = struct let rec really_recv flow ~read ~read_eof = Log.debug (fun m -> m "start to really [`read].") ; Mimic.read flow.flow >>= function - | Error err -> - Log.err (fun m -> m "[`read] got an error: %a." Mimic.pp_error err) ; - flow.rd_closed <- true ; - safely_close flow >>= fun () -> Lwt.return `Closed - | Ok `Eof -> - let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in - Log.debug (fun m -> m "[`read] Connection closed.") ; - flow.rd_closed <- true ; - safely_close flow >>= fun () -> Lwt.return `Closed | Ok (`Data v) -> let len = Cstruct.len v in Log.debug (fun m -> m "<- %d byte(s)" len) ; Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.len ~off:0 ~len v ; recv flow ~read ~read_eof + | Ok `Eof | Error _ -> ( + Ke.Rke.compress flow.queue ; + match Ke.Rke.N.peek flow.queue with + | [] -> + let _ (* 0 *) = read_eof Bigstringaf.empty ~off:0 ~len:0 in + Log.debug (fun m -> m "[`read] Connection closed.") ; + flow.rd_closed <- true ; + safely_close flow >>= fun () -> Lwt.return `Closed + | [ slice ] -> + let _ = read_eof slice ~off:0 ~len:(Bigstringaf.length slice) in + flow.rd_closed <- true ; + safely_close flow >>= fun () -> Lwt.return `Closed + | _ -> assert false) and recv flow ~read ~read_eof = match Ke.Rke.N.peek flow.queue with @@ -279,9 +283,7 @@ end = struct Log.debug (fun m -> m "[`read] shift %d/%d byte(s)" shift len) ; Ke.Rke.N.shift_exn flow.queue shift ; if shift = 0 - then ( - Ke.Rke.compress flow.queue ; - really_recv flow ~read ~read_eof) + then really_recv flow ~read ~read_eof else Lwt.return `Continue let drain flow ~read:_ ~read_eof = @@ -330,17 +332,11 @@ end = struct let rec rd_loop () = let rec go () = match Runtime.next_read_operation connection with - | `Read -> ( + | `Read -> Log.debug (fun m -> m "[`read] start to read.") ; let read = Runtime.read connection in let read_eof = Runtime.read_eof connection in - recv flow ~read ~read_eof >>= function - | `Closed -> - Log.err (fun m -> m "[`read] connection was closed by peer.") ; - Lwt.wakeup_later notify_read_loop_exited () ; - flow.rd_closed <- true ; - safely_close flow - | _ -> go ()) + recv flow ~read ~read_eof >>= fun _ -> go () | `Yield -> Log.debug (fun m -> m "next read operation: `yield") ; Runtime.yield_reader connection rd_loop ; @@ -415,8 +411,6 @@ let service connection accept close = Service { accept; connection; close } open Rresult open Lwt.Infix -let ( >>? ) = Lwt_result.bind - let serve_when_ready : type t flow. (t, flow, _) posix -> @@ -434,16 +428,17 @@ let serve_when_ready : Lwt.return_unit) ; t in let rec loop () = - let accept = accept t >>? fun flow -> Lwt.return_ok (`Flow flow) in - accept >>? function - | `Flow flow -> + accept t >>= function + | Ok flow -> Lwt.async (fun () -> handler flow) ; - Lwt.pause () >>= loop in + Lwt.pause () >>= loop + | Error `Closed -> Lwt.return_error `Closed + | Error _ -> Lwt.pause () >>= loop in let stop_result = Lwt.pick [ switched_off; loop () ] >>= function | Ok `Stopped -> close t >>= fun () -> Lwt.return_ok () | Error _ as err -> close t >>= fun () -> Lwt.return err in - stop_result >>= function Ok () | Error (`Closed | _) -> Lwt.return_unit) + stop_result >>= function Ok () | Error `Closed -> Lwt.return_unit) let server : type t. t runtime -> sleep:sleep -> t -> Mimic.flow -> unit Lwt.t = fun (module Runtime) ~sleep conn flow -> diff --git a/lib/paf_mirage.ml b/lib/paf_mirage.ml index 756bf3c..02bd55f 100644 --- a/lib/paf_mirage.ml +++ b/lib/paf_mirage.ml @@ -118,7 +118,9 @@ module Make (Time : Mirage_time.S) (Stack : Mirage_stack.V4V6) : Stack.TCP.create_connection t (ipaddr, port) >>= function | Error err -> Log.err (fun m -> - m "Got an error when we try to connect (TCP) to %a:%d: %a" + m + "Got an error when we try to connect to the server (TLS) to \ + %a:%d: %a" Ipaddr.pp ipaddr port Stack.TCP.pp_error err) ; Lwt.return_error (`Read err) | Ok flow -> @@ -181,9 +183,9 @@ module Make (Time : Mirage_time.S) (Stack : Mirage_stack.V4V6) : Lwt_mutex.unlock mutex ; accept t) - let close ({ stack; condition; _ } as t) = + let close ({ condition; _ } as t) = t.closed <- true ; - Stack.disconnect stack >>= fun () -> + (* Stack.disconnect stack >>= fun () -> *) Lwt_condition.signal condition () ; Lwt.return_unit @@ -206,11 +208,21 @@ module Make (Time : Mirage_time.S) (Stack : Mirage_stack.V4V6) : accept t >>= function | Error _ as err -> Lwt.return err | Ok flow -> ( - let dst = Stack.TCP.dst flow in + let ((ipaddr, port) as dst) = Stack.TCP.dst flow in TLS.server_of_flow tls flow >>= function | Ok flow -> Lwt.return_ok (dst, flow) - | Error _ as err -> Stack.TCP.close flow >>= fun () -> Lwt.return err) - in + | Error `Closed -> + (* XXX(dinosaure): be care! [`Closed] at this stage does not mean + * that the bound socket is closed but the socket with the peer is + * closed. *) + Lwt.return_error (`Write `Closed) + | Error err -> + Log.err (fun m -> + m + "Got an error when we try to connect to the client (TLS) \ + to %a:%d: %a" + Ipaddr.pp ipaddr port TLS.pp_write_error err) ; + Stack.TCP.close flow >>= fun () -> Lwt.return_error err) in let connection (dst, flow) = let error_handler = error_handler dst in let request_handler = request_handler dst in