Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions bin/admin.ml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
open Lwt.Infix
open Capnp_rpc_lwt

let () =
Logging.init ()

let or_die = function
| Ok x -> x
| Error `Msg m -> failwith m
Expand All @@ -19,28 +16,28 @@ let run cap_path fn =
Printf.eprintf "%s\n%!" msg;
exit 1

let add_client cap_path id =
let add_client () cap_path id =
run cap_path @@ fun admin_service ->
Capability.with_ref (Cluster_api.Admin.add_client admin_service id) @@ fun client ->
Persistence.save_exn client >|= fun uri ->
print_endline (Uri.to_string uri)

let remove_client cap_path id =
let remove_client () cap_path id =
run cap_path @@ fun admin_service ->
Cluster_api.Admin.remove_client admin_service id

let list_clients cap_path =
let list_clients () cap_path =
run cap_path @@ fun admin_service ->
Cluster_api.Admin.list_clients admin_service >|= function
| [] -> Fmt.epr "No clients.@."
| clients -> List.iter print_endline clients

let set_rate cap_path pool_id client_id rate =
let set_rate () cap_path pool_id client_id rate =
run cap_path @@ fun admin_service ->
let pool = Cluster_api.Admin.pool admin_service pool_id in
Cluster_api.Pool_admin.set_rate pool ~client_id rate

let show cap_path pool =
let show () cap_path pool =
run cap_path @@ fun admin_service ->
match pool with
| None ->
Expand All @@ -60,7 +57,7 @@ let drain pool workers =
) in
Lwt.join jobs

let set_active active all auto_create wait cap_path pool worker =
let set_active active () all auto_create wait cap_path pool worker =
run cap_path @@ fun admin_service ->
Capability.with_ref (Cluster_api.Admin.pool admin_service pool) @@ fun pool ->
match worker with
Expand Down Expand Up @@ -106,7 +103,7 @@ let set_active active all auto_create wait cap_path pool worker =
let pp_worker_name f { Cluster_api.Pool_admin.name; _ } =
Fmt.string f name

let update cap_path pool worker =
let update () cap_path pool worker =
run cap_path @@ fun admin_service ->
Capability.with_ref (Cluster_api.Admin.pool admin_service pool) @@ fun pool ->
match worker with
Expand Down Expand Up @@ -150,7 +147,7 @@ let update cap_path pool worker =
Fmt.(list ~sep:cut pp_worker_name) disconnected
)

let forget cap_path pool worker =
let forget () cap_path pool worker =
run cap_path @@ fun admin_service ->
Capability.with_ref (Cluster_api.Admin.pool admin_service pool) @@ fun pool ->
match worker with
Expand Down Expand Up @@ -238,47 +235,47 @@ let wait =

let add_client =
let doc = "Create a new client endpoint for submitting jobs" in
Term.(const add_client $ connect_addr $ Arg.required (client_id ~pos:0)),
Term.(const add_client $ Logging.term $ connect_addr $ Arg.required (client_id ~pos:0)),
Term.info "add-client" ~doc

let remove_client =
let doc = "Unregister a client." in
Term.(const remove_client $ connect_addr $ Arg.required (client_id ~pos:0)),
Term.(const remove_client $ Logging.term $ connect_addr $ Arg.required (client_id ~pos:0)),
Term.info "remove-client" ~doc

let list_clients =
let doc = "List registered clients" in
Term.(const list_clients $ connect_addr),
Term.(const list_clients $ Logging.term $ connect_addr),
Term.info "list-clients" ~doc

let set_rate =
let doc = "Set expected number of parallel jobs for a pool/client combination" in
Term.(const set_rate $ connect_addr $ Arg.required pool_pos $ Arg.required (client_id ~pos:1) $ Arg.required (rate ~pos:2)),
Term.(const set_rate $ Logging.term $ connect_addr $ Arg.required pool_pos $ Arg.required (client_id ~pos:1) $ Arg.required (rate ~pos:2)),
Term.info "set-rate" ~doc

let show =
let doc = "Show information about a service, pool or worker" in
Term.(const show $ connect_addr $ Arg.value pool_pos),
Term.(const show $ Logging.term $ connect_addr $ Arg.value pool_pos),
Term.info "show" ~doc

let pause =
let doc = "Set a worker to be unavailable for further jobs" in
Term.(const (set_active false) $ all $ auto_create $ wait $ connect_addr $ Arg.required pool_pos $ worker),
Term.(const (set_active false) $ Logging.term $ all $ auto_create $ wait $ connect_addr $ Arg.required pool_pos $ worker),
Term.info "pause" ~doc

let unpause =
let doc = "Resume a paused worker" in
Term.(const (set_active true) $ all $ auto_create $ const false $ connect_addr $ Arg.required pool_pos $ worker),
Term.(const (set_active true) $ Logging.term $ all $ auto_create $ const false $ connect_addr $ Arg.required pool_pos $ worker),
Term.info "unpause" ~doc

let update =
let doc = "Drain and then update worker(s)" in
Term.(const update $ connect_addr $ Arg.required pool_pos $ worker),
Term.(const update $ Logging.term $ connect_addr $ Arg.required pool_pos $ worker),
Term.info "update" ~doc

let forget =
let doc = "Forget about an old worker" in
Term.(const forget $ connect_addr $ Arg.required pool_pos $ worker),
Term.(const forget $ Logging.term $ connect_addr $ Arg.required pool_pos $ worker),
Term.info "forget" ~doc

let cmds = [add_client; remove_client; list_clients; set_rate; show; pause; unpause; update; forget]
Expand Down
9 changes: 3 additions & 6 deletions bin/client.ml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
open Lwt.Infix
open Capnp_rpc_lwt

let () =
Logging.init ()

let or_die = function
| Ok x -> x
| Error `Msg m -> failwith m
Expand Down Expand Up @@ -62,7 +59,7 @@ let read_whole_file path =
let len = in_channel_length ic in
really_input_string ic len

let submit { submission_path; pool; repository; commits; cache_hint; urgent; secrets } spec =
let submit () { submission_path; pool; repository; commits; cache_hint; urgent; secrets } spec =
let src =
match repository, commits with
| None, [] -> None
Expand Down Expand Up @@ -268,7 +265,7 @@ let submit_docker_options =

let submit_docker =
let doc = "Submit a Docker build to the scheduler" in
Term.(const submit $ submit_options_common $ submit_docker_options),
Term.(const submit $ Logging.term $ submit_options_common $ submit_docker_options),
Term.info "submit-docker" ~doc

let submit_obuilder_options =
Expand All @@ -279,7 +276,7 @@ let submit_obuilder_options =

let submit_obuilder =
let doc = "Submit an OBuilder build to the scheduler" in
Term.(const submit $ submit_options_common $ submit_obuilder_options),
Term.(const submit $ Logging.term $ submit_options_common $ submit_obuilder_options),
Term.info "submit-obuilder" ~doc

let cmds = [submit_docker; submit_obuilder]
Expand Down
3 changes: 2 additions & 1 deletion bin/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
(public_names ocluster-scheduler ocluster-client ocluster-worker ocluster-admin)
(package ocluster)
(names scheduler client worker admin)
(libraries dune-build-info ocluster-api logs.fmt fmt.tty capnp-rpc-unix cluster_scheduler cluster_worker prometheus-app.unix db))
(libraries dune-build-info ocluster-api logs.cli logs.fmt fmt.cli fmt.tty capnp-rpc-unix cluster_scheduler cluster_worker prometheus-app.unix db))

9 changes: 6 additions & 3 deletions bin/logging.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ let reporter =
in
{ Logs.report = report }

let init () =
Fmt_tty.setup_std_outputs ();
Logs.(set_level (Some Warning));
let init style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
(* Logs.Src.set_level Capnp_rpc.Debug.src (Some Debug); *)
Logs.set_reporter reporter

let term =
Cmdliner.Term.(const init $ Fmt_cli.style_renderer () $ Logs_cli.level ())
12 changes: 8 additions & 4 deletions bin/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ module Restorer = Capnp_rpc_net.Restorer

let ( / ) = Filename.concat

let () =
Prometheus_unix.Logging.init ()
let setup_log default_level =
Prometheus_unix.Logging.init ?default_level ();
()

let or_die = function
| Ok x -> x
Expand Down Expand Up @@ -84,7 +85,7 @@ let provision_client ~admin ~secrets_dir id =
Logs.app (fun f -> f "Wrote capability reference to %S" path)
)

let main capnp secrets_dir pools prometheus_config state_dir default_clients =
let main () capnp secrets_dir pools prometheus_config state_dir default_clients =
if not (dir_exists state_dir) then Unix.mkdir state_dir 0o755;
let db = Sqlite3.db_open (state_dir / "scheduler.db") in
Sqlite3.busy_timeout db 1000;
Expand Down Expand Up @@ -171,9 +172,12 @@ let default_clients =
~docv:"NAME"
["default-clients"]

let setup_log =
Term.(const setup_log $ Logs_cli.level ())

let cmd =
let doc = "Manage build workers" in
Term.(const main $ Capnp_rpc_unix.Vat_config.cmd $ secrets_dir $ pools $ listen_prometheus $ state_dir $ default_clients),
Term.(const main $ setup_log $ Capnp_rpc_unix.Vat_config.cmd $ secrets_dir $ pools $ listen_prometheus $ state_dir $ default_clients),
Term.info "ocluster-scheduler" ~doc ~version:Version.t

let () = Term.(exit @@ eval cmd)
12 changes: 8 additions & 4 deletions bin/worker.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
open Lwt.Infix

let () =
Prometheus_unix.Logging.init ()
let setup_log default_level =
Prometheus_unix.Logging.init ?default_level ();
()

let or_die = function
| Ok x -> x
Expand Down Expand Up @@ -40,7 +41,7 @@ let update_docker () =
let update_normal () =
Lwt.return (fun () -> Lwt.return ())

let main registration_path capacity name allow_push prune_threshold state_dir obuilder =
let main () registration_path capacity name allow_push prune_threshold state_dir obuilder =
let update =
if Sys.file_exists "/.dockerenv" then update_docker
else update_normal
Expand Down Expand Up @@ -123,9 +124,12 @@ module Obuilder_config = struct
Term.pure make $ Obuilder.Runc_sandbox.cmdliner $ store
end

let setup_log =
Term.(const setup_log $ Logs_cli.level ())

let cmd =
let doc = "Run a build worker" in
Term.(const main $ connect_addr $ capacity $ worker_name $ allow_push $ prune_threshold $ state_dir $ Obuilder_config.v),
Term.(const main $ setup_log $ connect_addr $ capacity $ worker_name $ allow_push $ prune_threshold $ state_dir $ Obuilder_config.v),
Term.info "ocluster-worker" ~doc ~version:Version.t

let () = Term.(exit @@ eval cmd)