Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions src/conduit-lwt-unix/conduit_lwt_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,41 @@ let run_handler handler v =
f "Uncaught exception in handler: %s" (Printexc.to_string ex)));
Lwt.return_unit))

let init ?(stop = fst (Lwt.wait ())) handler fd =
let init ?(nconn = 10_000) ?(stop = fst (Lwt.wait ())) handler fd =
let stop = Lwt.map (fun () -> `Stop) stop in
let log_exn = function
| Some ex ->
Log.warn (fun f ->
f "Uncaught exception accepting connection: %s"
(Printexc.to_string ex))
| None -> ()
in
let rec loop () =
Lwt.try_bind
let accepted =
Lwt_unix.accept_n fd nconn >>= fun (connections, maybe_error) ->
log_exn maybe_error;
Lwt.return (`Accepted connections)
in
Lwt.catch
(fun () ->
connected ();
throttle () >>= fun () ->
let accept = Lwt.map (fun v -> `Accept v) (Lwt_unix.accept fd) in
Lwt.choose [ accept; stop ] >|= function
| `Stop ->
Lwt.cancel accept;
`Stop
| `Accept _ as x -> x)
(function
Lwt.choose [ accepted; stop ] >>= function
| `Stop ->
disconnected ();
Lwt.cancel accepted;
Lwt.return_unit
| `Accept v ->
run_handler handler v;
loop ())
(fun exn ->
disconnected ();
match exn with
| `Accepted connections ->
Lwt_list.iter_s
(fun v ->
connected ();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may also be a race to update connected here since I am calling iter_p, right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite see the race you're referring to here -- where is the iter_p?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not mistaken iter_p runs the iteration in parallel, and connected () increments the active connection count without any protection (let connected () = incr active). My fear was that multiple connections executed in parallel could try to increment the variable simultaneously (and I don't want to put a mutex there).

throttle () >>= fun () ->
run_handler handler v;
Lwt.return_unit)
connections
>>= Lwt.pause
>>= loop)
(function
| Lwt.Canceled -> Lwt.return_unit
| ex ->
Log.warn (fun f ->
f "Uncaught exception accepting connection: %s"
(Printexc.to_string ex));
Lwt_unix.yield () >>= loop)
log_exn (Some ex);
Lwt.pause () >>= loop)
in
Lwt.finalize loop (fun () -> Lwt_unix.close fd)
1 change: 1 addition & 0 deletions src/conduit-lwt-unix/conduit_lwt_server.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val process_accept :
unit Lwt.t

val init :
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably document this, I was first waiting to see if you are interested at all and what the review says about the implementation

?nconn:int ->
?stop:unit Lwt.t ->
(Lwt_unix.file_descr * Lwt_unix.sockaddr -> unit Lwt.t) ->
Lwt_unix.file_descr ->
Expand Down