Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scheduler/cluster_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ module Pool_api = struct
match Pool.register t.pool ~name ~capacity with
| Error `Name_taken as e ->
Log.warn (fun f -> f "Worker %S already registered!" name);
Capability.dec_ref worker;
e
| Ok q ->
Pool.set_active q true ~reason:Inactive_reasons.worker;
Expand Down
22 changes: 20 additions & 2 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,23 @@ let await_builder () =
Alcotest.(check string) "Check job worked" "Building on worker-1\nBuilding example\nJob succeeded\n" result;
Lwt.return_unit

(* Two workers register with the same name. *)
let already_registered () =
with_sched @@ fun ~admin:_ ~registry ->
let api = Cluster_api.Worker.local ~metrics:(fun _ -> assert false) ~self_update:(fun () -> assert false) in
let q1 = Cluster_api.Registration.register registry ~name:"worker-1" ~capacity:1 api in
Capability.wait_until_settled q1 >>= fun () ->
let q2 = Cluster_api.Registration.register registry ~name:"worker-1" ~capacity:1 api in
Capability.wait_until_settled q2 >>= fun () ->
let pp_err = Fmt.(option ~none:(unit "ok")) Capnp_rpc.Exception.pp in
let p1 = Fmt.strf "%a" pp_err (Capability.problem q1) in
let p2 = Fmt.strf "%a" pp_err (Capability.problem q2) in
Alcotest.(check string) "First worker connected" "ok" p1;
Alcotest.(check string) "Second was rejected" "Failed: Worker already registered!" p2;
Capability.dec_ref q1;
Capability.dec_ref api;
Lwt.return_unit

(* A single builder can't do all the jobs and they queue up. *)
let builder_capacity () =
let builder = Mock_builder.create () in
Expand Down Expand Up @@ -347,22 +364,23 @@ let clients () =
(fun () -> Alcotest.fail "Should have failed!")
(fun _ -> Lwt.return_unit)

let test_case name fn =
let test_case ?(expected_warnings=0) name fn =
Alcotest_lwt.test_case name `Quick @@ fun _ () ->
let problems = Logs.(warn_count () + err_count ()) in
fn () >>= fun () ->
Lwt.pause () >>= fun () ->
Gc.full_major ();
Lwt.pause () >|= fun () ->
let problems' = Logs.(warn_count () + err_count ()) in
Alcotest.(check int) "Check log for warnings" 0 (problems' - problems)
Alcotest.(check int) "Check log for warnings" expected_warnings (problems' - problems)

let () =
Lwt_main.run @@ Alcotest_lwt.run ~verbose "build-scheduler" [
"main", [
test_case "simple" simple;
test_case "fails" fails;
test_case "await_builder" await_builder;
test_case "already_registered" already_registered ~expected_warnings:1;
test_case "builder_capacity" builder_capacity;
test_case "network" network;
test_case "worker_disconnects" worker_disconnects;
Expand Down