diff --git a/src/conduit-lwt-unix/conduit_lwt_server.ml b/src/conduit-lwt-unix/conduit_lwt_server.ml index 7184d56b..cdf4a804 100644 --- a/src/conduit-lwt-unix/conduit_lwt_server.ml +++ b/src/conduit-lwt-unix/conduit_lwt_server.ml @@ -71,34 +71,41 @@ let run_handler handler v = f "Uncaught exception in handler: %s" (Printexc.to_string ex))); Lwt.return_unit)) -let init ?(stop = fst (Lwt.wait ())) handler fd = +let init ?(nconn = 10_000) ?(stop = fst (Lwt.wait ())) handler fd = let stop = Lwt.map (fun () -> `Stop) stop in + let log_exn = function + | Some ex -> + Log.warn (fun f -> + f "Uncaught exception accepting connection: %s" + (Printexc.to_string ex)) + | None -> () + in let rec loop () = - Lwt.try_bind + let accepted = + Lwt_unix.accept_n fd nconn >>= fun (connections, maybe_error) -> + log_exn maybe_error; + Lwt.return (`Accepted connections) + in + Lwt.catch (fun () -> - connected (); - throttle () >>= fun () -> - let accept = Lwt.map (fun v -> `Accept v) (Lwt_unix.accept fd) in - Lwt.choose [ accept; stop ] >|= function - | `Stop -> - Lwt.cancel accept; - `Stop - | `Accept _ as x -> x) - (function + Lwt.choose [ accepted; stop ] >>= function | `Stop -> - disconnected (); + Lwt.cancel accepted; Lwt.return_unit - | `Accept v -> - run_handler handler v; - loop ()) - (fun exn -> - disconnected (); - match exn with + | `Accepted connections -> + Lwt_list.iter_s + (fun v -> + connected (); + throttle () >>= fun () -> + run_handler handler v; + Lwt.return_unit) + connections + >>= Lwt.pause + >>= loop) + (function | Lwt.Canceled -> Lwt.return_unit | ex -> - Log.warn (fun f -> - f "Uncaught exception accepting connection: %s" - (Printexc.to_string ex)); - Lwt_unix.yield () >>= loop) + log_exn (Some ex); + Lwt.pause () >>= loop) in Lwt.finalize loop (fun () -> Lwt_unix.close fd) diff --git a/src/conduit-lwt-unix/conduit_lwt_server.mli b/src/conduit-lwt-unix/conduit_lwt_server.mli index f9897881..0f8a5b3c 100644 --- a/src/conduit-lwt-unix/conduit_lwt_server.mli +++ b/src/conduit-lwt-unix/conduit_lwt_server.mli @@ -10,6 +10,7 @@ val process_accept : unit Lwt.t val init : + ?nconn:int -> ?stop:unit Lwt.t -> (Lwt_unix.file_descr * Lwt_unix.sockaddr -> unit Lwt.t) -> Lwt_unix.file_descr ->