Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions bin/worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ let update_docker () =
let update_normal () =
Lwt.return (fun () -> Lwt.return_unit)

let main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
let main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
setup_log ?style_renderer ?formatter level;
let update =
if Sys.file_exists "/.dockerenv" then update_docker
Expand All @@ -57,16 +57,16 @@ let main ?style_renderer level ?formatter registration_path capacity name allow_
Lwt_main.run begin
let vat = Capnp_rpc_unix.client_only_vat () in
let sr = Capnp_rpc_unix.Cap_file.load vat registration_path |> or_die in
Cluster_worker.run ~capacity ~name ~allow_push ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ~obuilder_prune_limit ?obuilder ~additional_metrics ~state_dir ~update sr
Cluster_worker.run ~capacity ~name ~allow_push ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_item_threshold ~obuilder_prune_limit ?obuilder ~additional_metrics ~state_dir ~update sr
end

(* Command-line parsing *)
let main ~install (style_renderer, args1) (level, args2) ((registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics), args3) =
let main ~install (style_renderer, args1) (level, args2) ((registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_item_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics), args3) =
if install then
Ok (Winsvc_wrapper.install name "OCluster Worker" "Run a build worker" (args1 @ args2 @ args3))
else
Ok (Winsvc_wrapper.run name state_dir (fun ?formatter () ->
main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics))
main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics))


open Cmdliner
Expand Down Expand Up @@ -130,6 +130,15 @@ let obuilder_prune_threshold =
~docs:"OBUILDER"
["obuilder-prune-threshold"]

let obuilder_prune_item_threshold =
Arg.value @@
Arg.opt Arg.(some int64) None @@
Arg.info
~doc:"If using OBuilder, this threshold is used to prune the stored builds if the number of cached steps exceeds this value."
~docv:"ITEMS"
~docs:"OBUILDER"
["obuilder-prune-item-threshold"]

let obuilder_prune_limit =
Arg.value @@
Arg.opt Arg.int 100 @@
Expand Down Expand Up @@ -193,11 +202,11 @@ module Obuilder_config = struct
end

let worker_opts_t =
let worker_opts registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
(registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics) in
let worker_opts registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
(registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_item_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics) in
Term.(with_used_args
(const worker_opts $ connect_addr $ capacity $ worker_name $ allow_push $ healthcheck_period
$ prune_threshold $ docker_max_df_size $ obuilder_prune_threshold $ obuilder_prune_limit $ state_dir $ Obuilder_config.v $ additional_metrics))
$ prune_threshold $ docker_max_df_size $ obuilder_prune_threshold $ obuilder_prune_item_threshold $ obuilder_prune_limit $ state_dir $ Obuilder_config.v $ additional_metrics))

let cmd ~install =
let doc = "Run a build worker" in
Expand Down
2 changes: 1 addition & 1 deletion current_ocluster.opam
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bug-reports: "https://github.com/ocurrent/ocluster/issues"
depends: [
"dune" {>= "3.7"}
"ocluster-api" {= version}
"ocaml" {>= "4.12.0"}
"ocaml" {>= "4.14.1"}
"capnp-rpc-unix" {>= "1.2.3"}
"current" {>= "0.6.4"}
"current_git" {>= "0.6.4"}
Expand Down
4 changes: 2 additions & 2 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
(synopsis "Cap'n Proto API for OCluster")
(description "OCaml bindings for the OCluster Cap'n Proto API.")
(depends
(ocaml (>= 4.12.0))
(ocaml (>= 4.14.1))
(capnp-rpc-lwt (>= 1.2.3))
fmt
(lwt (>= 5.6.1))
Expand Down Expand Up @@ -88,7 +88,7 @@
"Creates a stage in an OCurrent pipeline for submitting jobs to OCluster.")
(depends
(ocluster-api (= :version))
(ocaml (>= 4.12.0))
(ocaml (>= 4.14.1))
(capnp-rpc-unix (>= 1.2.3))
(current (>= 0.6.4))
(current_git (>= 0.6.4))
Expand Down
2 changes: 1 addition & 1 deletion ocluster-api.opam
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ doc: "https://ocurrent.github.io/ocluster/"
bug-reports: "https://github.com/ocurrent/ocluster/issues"
depends: [
"dune" {>= "3.7"}
"ocaml" {>= "4.12.0"}
"ocaml" {>= "4.14.1"}
"capnp-rpc-lwt" {>= "1.2.3"}
"fmt"
"lwt" {>= "5.6.1"}
Expand Down
4 changes: 2 additions & 2 deletions worker/cluster_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ let self_update ~update t =
Lwt_result.fail (`Msg (Printexc.to_string ex))
)

let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_limit ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service =
let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_item_threshold ?obuilder_prune_limit ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service =
begin match prune_threshold, docker_max_df_size with
| None, None -> Log.info (fun f -> f "Prune threshold not set and docker max df size is not. Will not check for low disk-space!")
| None, Some size -> Log.info (fun f -> f "Pruning docker whenever the memory used exceeds %3.2fGB" size)
Expand All @@ -495,7 +495,7 @@ let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?do
end;
begin match obuilder with
| None -> Lwt.return_none
| Some config -> Obuilder_build.create ?prune_threshold:obuilder_prune_threshold ?prune_limit:obuilder_prune_limit config >|= Option.some
| Some config -> Obuilder_build.create ?prune_threshold:obuilder_prune_threshold ?prune_item_threshold:obuilder_prune_item_threshold ?prune_limit:obuilder_prune_limit config >|= Option.some
end >>= fun obuilder ->
let build =
match build with
Expand Down
1 change: 1 addition & 0 deletions worker/cluster_worker.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ val run :
?prune_threshold:float ->
?docker_max_df_size:float ->
?obuilder_prune_threshold:float ->
?obuilder_prune_item_threshold:int64 ->
?obuilder_prune_limit:int ->
?obuilder:Obuilder_config.t ->
?additional_metrics:(string * Uri.t) list ->
Expand Down
48 changes: 27 additions & 21 deletions worker/obuilder_build.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ end

type t = {
builder : builder;
root : string;
mutable pruning : bool;
cond : unit Lwt_condition.t; (* Fires when we finish pruning *)
prune_threshold : float option;
prune_item_threshold : int64 option; (* Threshold number of items to hold in obuilder store *)
prune_limit : int option; (* Number of items to prune from obuilder when threshold is reached *)
}

Expand All @@ -37,7 +37,7 @@ let log_to log_data tag msg =
| `Note -> Log_data.info log_data "\027[01;2m\027[01;35m%a %s\027[0m" pp_timestamp (Unix.gettimeofday ()) msg
| `Output -> Log_data.write log_data msg

let create ?prune_threshold ?prune_limit config =
let create ?prune_threshold ?prune_item_threshold ?prune_limit config =
let { Config.store; sandbox_config } = config in
store >>= fun (Obuilder.Store_spec.Store ((module Store), store)) ->
begin match sandbox_config with
Expand All @@ -60,22 +60,25 @@ let create ?prune_threshold ?prune_limit config =
Log.info (fun f -> f "OBuilder self-test passed");
{
builder = Builder ((module Builder), builder);
root = Store.root store;
pruning = false;
prune_threshold;
prune_item_threshold;
prune_limit;
cond = Lwt_condition.create ();
}

(* Prune [t] until [path]'s free space rises above [prune_threshold]. *)
let do_prune ~path ~prune_threshold ~prune_limit t =
(* Prune [t] until free space rises above [prune_threshold]
or number of items falls below [prune_item_threshold]. *)
let do_prune ~prune_threshold ~prune_item_threshold ~prune_limit t =
let Builder ((module Builder), builder) = t.builder in
let rec aux () =
let stop = Unix.gettimeofday () -. prune_margin |> Unix.gmtime in
Builder.prune builder ~before:stop prune_limit >>= fun n ->
let free = Df.free_space_percent path in
Log.info (fun f -> f "OBuilder partition: %.0f%% free after pruning %d items" free n);
if free > prune_threshold then Lwt.return_unit (* Space problem is fixed! *)
Builder.df builder >>= fun free ->
let count = Builder.count builder in
Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items after pruning %d items" free count n);
if free > prune_threshold && count < prune_item_threshold
then Lwt.return_unit (* Space problem is fixed! *)
else if n < prune_limit then (
Log.warn (fun f -> f "Out of space, but nothing left to prune! (will wait and then retry)");
Lwt_unix.sleep 600.0 >>= aux
Expand All @@ -86,26 +89,29 @@ let do_prune ~path ~prune_threshold ~prune_limit t =
in
aux ()

(* Check the free space in [t]'s store.
If less than [t.prune_threshold], spawn a prune operation (if not already running).
(* Check the free space and/or number of items in [t]'s store.
If less than [t.prune_threshold] or items > [t.prune_item_threshold], spawn a prune operation (if not already running).
If less than half that is remaining, also wait for it to finish.
Returns once there is enough free space to proceed. *)
let check_free_space t =
match t.prune_threshold, t.prune_limit with
| None, None
| Some _, None
| None, Some _ -> Lwt.return_unit
| Some prune_threshold, Some prune_limit ->
let path = t.root in
let prune_limit = Option.value t.prune_limit ~default:100 in
let prune_threshold = Option.value t.prune_threshold ~default:0. in
let prune_item_threshold = Option.value t.prune_item_threshold ~default:Int64.max_int in
if prune_threshold = 0. && prune_item_threshold = Int64.max_int then
Lwt.return_unit (* No limits have been set *)
else
let Builder ((module Builder), builder) = t.builder in
let rec aux () =
let free = Df.free_space_percent path in
Log.info (fun f -> f "OBuilder partition: %.0f%% free" free);
(* If we're low on space, spawn a pruning thread. *)
if free < prune_threshold && t.pruning = false then (
Builder.df builder >>= fun free ->
let count = Builder.count builder in
Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items" free count);
(* If we're low on space, or over the threshold number of items spawn a pruning thread. *)
if ((prune_threshold > 0. && free < prune_threshold) ||
(prune_item_threshold < Int64.max_int && count > prune_item_threshold)) && not t.pruning then (
t.pruning <- true;
Lwt.async (fun () ->
Lwt.finalize
(fun () -> do_prune ~path ~prune_threshold ~prune_limit t)
(fun () -> do_prune ~prune_threshold ~prune_item_threshold ~prune_limit t)
(fun () ->
Lwt.pause () >|= fun () ->
t.pruning <- false;
Expand Down
2 changes: 1 addition & 1 deletion worker/obuilder_build.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Config : sig
-> Obuilder.Store_spec.store Lwt.t -> t
end

val create : ?prune_threshold:float -> ?prune_limit:int -> Config.t -> t Lwt.t
val create : ?prune_threshold:float -> ?prune_item_threshold:int64 -> ?prune_limit:int -> Config.t -> t Lwt.t

val build : t ->
switch:Lwt_switch.t ->
Expand Down