Skip to content

Commit

Permalink
thread counter: futher split out api
Browse files Browse the repository at this point in the history
  • Loading branch information
just-max committed Jul 14, 2024
1 parent d5dccdd commit bdc7410
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions src/stdlib-variants/thread-counter/thread_counter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module Counter = struct

let lock_if b m = if b then lock_mutex m else empty_context' ()

(* Note: we enforce that spawned threads don't raise uncaught exceptions,
(** Note: we enforce that spawned threads don't raise uncaught exceptions,
which in theory changes the semantics of threads. The value of being
able to report stray exceptions outweighs the slim chance anyone
would rely on being able to ignore exceptions in threads. *)
Expand Down Expand Up @@ -123,8 +123,9 @@ module Counter = struct
let get_thread_count group =
let< _ = lock_mutex group.owner.mut in group.thread_count

(** Wait for threads in a group to complete. Group must be finished first. *)
let join_group ~leftover_thread_limit ~timeout group =
(* group must be stopped first; busy waits to implement timeout *)
(* busy waits to implement timeout *)
d_ "join_group";

let _ =
Expand All @@ -141,16 +142,23 @@ module Counter = struct
in
loop ()

(** {1 High-level group operations} *)

type thread_group_err =
| ThreadLimitReached of int
| ThreadsLeftOver of { left_over : int; limit : int }
| ExceptionRaised of { main : bool; exn_info : Util.exn_info }

let spawn_thread_group_no_check ?thread_limit ?leftover_limit cnt f x =
(** Create a group that runs the given function, then sets the return value
as the return value of the group. *)
let spawn_thread_group ?thread_limit cnt f x =
let group = create_group ?thread_limit cnt in

spawn_thread ~group cnt Util.(try_return group % try_to_result f) x |> ignore;
group

(** Wait for a group to finish then join its threads ({!join_group}). *)
let collect_thread_group ?leftover_limit group =
let cnt = group.owner in
let fin =
let rec loop () = match group.state with
| Running -> d_ "still running"; Condition.wait group.finished cnt.mut; loop ()
Expand All @@ -167,8 +175,9 @@ module Counter = struct

fin, leftover_count

let check_spawn_thread_group ?leftover_limit fin leftover_count =
let r = match[@warning "-4"] fin with
(** Check the return value of {!collect_thread_group} *)
let check_thread_group_result ?leftover_limit fin leftover_count =
let r = match fin with
| Return (Ok x) -> Ok x
| Return (Error e) -> Error [ExceptionRaised { main = true; exn_info = e }]
| Uncaught e -> Error [ExceptionRaised { main = false; exn_info = e }]
Expand All @@ -180,9 +189,11 @@ module Counter = struct
r |> add_err (ThreadsLeftOver { left_over = leftover_count ; limit = lim })
| _ -> r

let spawn_thread_group ?thread_limit ?leftover_limit cnt f x =
let fin, leftover_count = spawn_thread_group_no_check cnt f x ?thread_limit ?leftover_limit in
check_spawn_thread_group ?leftover_limit fin leftover_count
(** Combines {!spawn_thread_group}, {!collect_thread_group}, and {!check_thread_group_result}.*)
let run_in_thread_group ?thread_limit ?leftover_limit cnt f x =
let group = spawn_thread_group cnt f x ?thread_limit in
let fin, leftover_count = collect_thread_group group ?leftover_limit in
check_thread_group_result ?leftover_limit fin leftover_count

(* the rest is for error reporting, could do with less code/abstraction... *)

Expand Down

0 comments on commit bdc7410

Please sign in to comment.