diff --git a/Dockerfile b/Dockerfile index 45260226..ad7d6e8c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ -FROM ocurrent/opam:debian-10-ocaml-4.10@sha256:12fac4e3520657aa72074d2a3b2658776d4b3ce486c930099c996d478c3a2501 AS build +FROM ocaml/opam:debian-10-ocaml-4.12@sha256:ba38dbfbfa92b0dca7171ac33d81276e7209912baf26887ecc20c382d15f32e4 AS build RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev -y --no-install-recommends -RUN cd ~/opam-repository && git pull origin -q master && git reset --hard e132600d1ea27a5be1edfb0079a205ba05830b8e && opam update +RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 1bd3ea712eef197c62c6ab92b2642cc2cda11be2 && opam update COPY --chown=opam ocluster-api.opam ocluster.opam /src/ COPY --chown=opam obuilder/obuilder.opam obuilder/obuilder-spec.opam /src/obuilder/ RUN opam pin -yn /src/obuilder/ diff --git a/Dockerfile.worker b/Dockerfile.worker index bc742cfb..602ec676 100644 --- a/Dockerfile.worker +++ b/Dockerfile.worker @@ -1,6 +1,6 @@ -FROM ocurrent/opam:debian-10-ocaml-4.10@sha256:12fac4e3520657aa72074d2a3b2658776d4b3ce486c930099c996d478c3a2501 AS build +FROM ocaml/opam:debian-10-ocaml-4.12@sha256:ba38dbfbfa92b0dca7171ac33d81276e7209912baf26887ecc20c382d15f32e4 AS build RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev -y --no-install-recommends -RUN cd ~/opam-repository && git pull origin -q master && git reset --hard e132600d1ea27a5be1edfb0079a205ba05830b8e && opam update +RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 1bd3ea712eef197c62c6ab92b2642cc2cda11be2 && opam update COPY --chown=opam ocluster-api.opam ocluster.opam /src/ COPY --chown=opam obuilder/obuilder.opam obuilder/obuilder-spec.opam /src/obuilder/ RUN opam pin -yn /src/obuilder/ diff --git a/current_ocluster.opam b/current_ocluster.opam index ec51c2e3..469e78a6 100644 --- a/current_ocluster.opam +++ b/current_ocluster.opam @@ -8,7 +8,7 @@ authors: ["talex5@gmail.com"] homepage: "https://github.com/ocurrent/ocluster" bug-reports: "https://github.com/ocurrent/ocluster/issues" depends: [ - "dune" {>= "2.5"} + "dune" {>= "2.8"} "ppx_deriving" "ocluster-api" {= version} "lwt" @@ -21,9 +21,10 @@ depends: [ "prometheus" "ppx_deriving_yojson" "ocaml" {>= "4.10.0"} + "odoc" {with-doc} ] build: [ - ["dune" "subst"] {pinned} + ["dune" "subst"] {dev} [ "dune" "build" diff --git a/dune-project b/dune-project index 25b8df8a..12e4b7bf 100644 --- a/dune-project +++ b/dune-project @@ -1,4 +1,4 @@ -(lang dune 2.5) +(lang dune 2.8) (name ocluster) (formatting disabled) (generate_opam_files true) @@ -8,8 +8,7 @@ (package (name ocluster-api) (synopsis "Cap'n Proto API for OCluster") - (description - "OCaml bindings for the OCluster Cap'n Proto API.") + (description "OCaml bindings for the OCluster Cap'n Proto API.") (depends ppx_deriving lwt @@ -21,16 +20,9 @@ (name ocluster) (synopsis "Distribute build jobs to workers") (description -"OCluster manages a pool of build workers. -A build scheduler service accepts build jobs from clients and distributes them to worker machines using Cap'n Proto. -Workers register themselves by connecting to the scheduler (and workers do not need to be able to accept incoming network connections). - -The scheduler can manage multiple pools (e.g. `linux-x86_64` and `linux-arm32`). -Clients say which pool should handle their requests. -At the moment, two build types are provided: building a Dockerfile, or building an OBuilder spec. -In either case, the build may done in the context of some Git commit. -The scheduler tries to schedule similar builds on the same machine, to benefit from caching.") + "OCluster manages a pool of build workers.\nA build scheduler service accepts build jobs from clients and distributes them to worker machines using Cap'n Proto.\nWorkers register themselves by connecting to the scheduler (and workers do not need to be able to accept incoming network connections).\n\nThe scheduler can manage multiple pools (e.g. `linux-x86_64` and `linux-arm32`).\nClients say which pool should handle their requests.\nAt the moment, two build types are provided: building a Dockerfile, or building an OBuilder spec.\nIn either case, the build may done in the context of some Git commit.\nThe scheduler tries to schedule similar builds on the same machine, to benefit from caching.") (depends + (ppx_expect (>= v0.14.1)) prometheus ppx_sexp_conv dune-build-info @@ -58,7 +50,8 @@ The scheduler tries to schedule similar builds on the same machine, to benefit f (package (name current_ocluster) (synopsis "OCurrent plugin for OCluster builds") - (description "Creates a stage in an OCurrent pipeline for submitting jobs to OCluster.") + (description + "Creates a stage in an OCurrent pipeline for submitting jobs to OCluster.") (depends ppx_deriving (ocluster-api (= :version)) diff --git a/ocluster-api.opam b/ocluster-api.opam index f5d0f628..e675e8de 100644 --- a/ocluster-api.opam +++ b/ocluster-api.opam @@ -7,16 +7,17 @@ authors: ["talex5@gmail.com"] homepage: "https://github.com/ocurrent/ocluster" bug-reports: "https://github.com/ocurrent/ocluster/issues" depends: [ - "dune" {>= "2.5"} + "dune" {>= "2.8"} "ppx_deriving" "lwt" "capnp-rpc-lwt" {>= "0.9.0"} "fmt" "ppx_deriving_yojson" "ocaml" {>= "4.10.0"} + "odoc" {with-doc} ] build: [ - ["dune" "subst"] {pinned} + ["dune" "subst"] {dev} [ "dune" "build" diff --git a/ocluster.opam b/ocluster.opam index 3415c974..1e6aec5b 100644 --- a/ocluster.opam +++ b/ocluster.opam @@ -16,7 +16,8 @@ authors: ["talex5@gmail.com"] homepage: "https://github.com/ocurrent/ocluster" bug-reports: "https://github.com/ocurrent/ocluster/issues" depends: [ - "dune" {>= "2.5"} + "dune" {>= "2.8"} + "ppx_expect" {>= "v0.14.1"} "prometheus" "ppx_sexp_conv" "dune-build-info" @@ -41,9 +42,10 @@ depends: [ "current_ocluster" {= version & with-test} "alcotest" {>= "1.0.0" & with-test} "alcotest-lwt" {>= "1.0.1" & with-test} + "odoc" {with-doc} ] build: [ - ["dune" "subst"] {pinned} + ["dune" "subst"] {dev} [ "dune" "build" diff --git a/test/dune b/test/dune index 6f0e55b2..82359dfe 100644 --- a/test/dune +++ b/test/dune @@ -1,6 +1,7 @@ (test (name test) (package ocluster) + (modules test mock_builder mock_network test_plugin) (libraries alcotest-lwt capnp-rpc-net @@ -12,3 +13,17 @@ logs.fmt lwt.unix prometheus-app)) + +(library + (name ocluster_expect_tests) + (package ocluster) + (modules test_scheduling) + (inline_tests) + (preprocess (pps ppx_expect)) + (libraries + cluster_scheduler + db + fmt.tty + logs.fmt + lwt.unix + prometheus-app)) diff --git a/test/test.ml b/test/test.ml index 15fcd3af..a3c1863b 100644 --- a/test/test.ml +++ b/test/test.ml @@ -375,6 +375,5 @@ let () = test_case "admin" admin; test_case "clients" clients; ]; - "scheduling", Test_scheduling.suite; "plugin", Test_plugin.suite; ] diff --git a/test/test_scheduling.ml b/test/test_scheduling.ml index 6d99abf5..2b06defd 100644 --- a/test/test_scheduling.ml +++ b/test/test_scheduling.ml @@ -35,20 +35,47 @@ let job_state x = | Fail ex -> Error (Printexc.to_string ex) | Sleep -> Error "pending" -let pop_result = Alcotest.(result string string) +let pp_result = Fmt.(result ~ok:string ~error:string) +let pp_cancel = Fmt.(result ~ok:(const string "cancelled") ~error:(const string "error")) -let flush_queue w ~expect = - let rec aux = function - | [] -> Pool.release w; Lwt.return_unit - | x :: xs -> - let job = Pool.pop w in - Lwt.pause () >>= fun () -> - Alcotest.(check pop_result) ("Flush " ^ x) (Ok x) (job_state job); - aux xs - in - aux expect +let println fmt = Fmt.pr (fmt ^^ "@.") +let print_result = Fmt.pr "%a@." pp_result +let print_pool = Fmt.pr "%a@." Pool.dump + +let rec flush_queue w = + let job = Pool.pop w in + Lwt.pause () >>= fun () -> + if Lwt.is_sleeping job then ( + Pool.release w; + Lwt.return_unit + ) else ( + println "Flush %a" pp_result (job_state job); + flush_queue w + ) + +let run fn = + Lwt_main.run @@ begin + Lwt_unix.yield () >>= fun () -> (* Ensure we're inside the Lwt mainloop. Lwt.pause behaves strangely otherwise. *) + Fake_time.now := 1.0; + fn () >>= fun () -> + Lwt.pause () >|= fun () -> + Prometheus.CollectorRegistry.(collect default) + |> Fmt.to_to_string Prometheus_app.TextFormat_0_0_4.output + |> String.split_on_char '\n' + |> List.iter (fun line -> + if Astring.String.is_prefix ~affix:"scheduler_pool_" line then ( + match Astring.String.cut ~sep:"} " line with + | None -> Fmt.failwith "Bad metrics line: %S" line + | Some (key, _) when Astring.String.is_infix ~affix:"_total{" key -> () + | Some (key, value) -> + if float_of_string value <> 0.0 then + Fmt.failwith "Non-zero metric after test: %s=%s" key value + ) + ) + end let with_test_db fn = + run @@ fun () -> let db = Sqlite3.db_open ":memory:" in Lwt.finalize (fun () -> fn (Cluster_scheduler.Pool.Dao.init db)) @@ -59,7 +86,7 @@ let submit ~urgent client job = () (* Assign three jobs to two workers. *) -let simple () = +let%expect_test "simple" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"simple" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in @@ -73,13 +100,18 @@ let simple () = submit user ~urgent:false @@ job "job2"; submit user ~urgent:false @@ job "job3"; Lwt.pause () >>= fun () -> - Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "job1") (job_state w1a); - Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job2") (job_state w2a); + (* Worker 1 / job 1 *) + print_result (job_state w1a); + [%expect{| job1 |}]; + (* Worker 2 / job 1 *) + print_result (job_state w2a); + [%expect{| job2 |}]; Pool.release w2; - flush_queue w1 ~expect:["job3"] + flush_queue w1 >|= fun () -> + [%expect {| Flush job3 |}] (* Bias assignment towards workers that have things cached. *) -let cached_scheduling () = +let%expect_test "cached_scheduling" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"cached_scheduling" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in @@ -88,14 +120,17 @@ let cached_scheduling () = Pool.set_active w2 true; let w1a = Pool.pop w1 in let w2a = Pool.pop w2 in - Alcotest.(check string) "Workers ready" "\ - capacity: 2\n\ - queue: (ready) [worker-2 worker-1]\n\ - registered:\n\ - \ worker-1 (0): []\n\ - \ worker-2 (0): []\n\ - clients: \n\ - cached: \n" (Fmt.to_to_string Pool.dump pool); + (* Workers ready *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (ready) [worker-2 worker-1] + registered: + worker-1 (0): [] + worker-2 (0): [] + clients: + cached: + |}]; let user = Pool.client pool ~client_id:"u1" in Pool.Client.set_rate user 2.0; submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; @@ -103,64 +138,91 @@ let cached_scheduling () = submit user ~urgent:false @@ job "job3" ~cache_hint:"a"; submit user ~urgent:false @@ job "job4" ~cache_hint:"a"; submit user ~urgent:false @@ job "job5" ~cache_hint:"c"; - Alcotest.(check string) "Jobs queued" "\ - capacity: 2\n\ - queue: (backlog) [u1:job5@12s u1:job4@11s u1:job3@10s]\n\ - registered:\n\ - \ worker-1 (10): [u1:job1@0s(10)]\n\ - \ worker-2 (10): [u1:job2@5s(10)]\n\ - clients: u1(2)+17s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* Jobs queued *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [u1:job5@12s u1:job4@11s u1:job3@10s] + registered: + worker-1 (10): [u1:job1@0s(10)] + worker-2 (10): [u1:job2@5s(10)] + clients: u1(2)+17s + cached: a: [worker-1], b: [worker-2] + |}]; Lwt.pause () >>= fun () -> - Alcotest.(check string) "Jobs started" "\ - capacity: 2\n\ - queue: (backlog) [u1:job5@12s u1:job4@11s u1:job3@10s]\n\ - registered:\n\ - \ worker-1 (0): []\n\ - \ worker-2 (0): []\n\ - clients: u1(2)+17s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "job1") (job_state w1a); - Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job2") (job_state w2a); + (* Jobs started *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [u1:job5@12s u1:job4@11s u1:job3@10s] + registered: + worker-1 (0): [] + worker-2 (0): [] + clients: u1(2)+17s + cached: a: [worker-1], b: [worker-2] + |}]; + (* Worker 1 / job 1 *) + print_result (job_state w1a); + [%expect{| job1 |}]; + (* Worker 2 / job 1 *) + print_result (job_state w2a); + [%expect{| job2 |}]; (* Worker 2 asks for another job, but the next two jobs would be better done on worker-1. *) let w2b = Pool.pop w2 in - Alcotest.(check string) "Jobs 3 and 4 assigned to worker-1" "\ - capacity: 2\n\ - queue: (backlog) []\n\ - registered:\n\ - \ worker-1 (4): [u1:job4@11s(2) u1:job3@10s(2)]\n\ - \ worker-2 (0): []\n\ - clients: u1(2)+17s\n\ - cached: a: [worker-1], b: [worker-2], c: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Alcotest.(check pop_result) "Worker 2 / job 2" (Ok "job5") (job_state w2b); + (* Jobs 3 and 4 assigned to worker-1 *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [] + registered: + worker-1 (4): [u1:job4@11s(2) u1:job3@10s(2)] + worker-2 (0): [] + clients: u1(2)+17s + cached: a: [worker-1], b: [worker-2], c: [worker-2] + |}]; + (* Worker 2 / job 2 *) + print_result (job_state w2b); + [%expect{| job5 |}]; (* Worker 1 leaves. Its two queued jobs get reassigned. *) let w2c = Pool.pop w2 in - Alcotest.(check pop_result) "Worker 2 / job 3" (Error "pending") (job_state w2c); + (* Worker 2 / job 3 *) + print_result (job_state w2c); + [%expect{| pending |}]; Logs.info (fun f -> f "@[w1 about to leave:@,%a@]" Pool.dump pool); Pool.release w1; Lwt.pause () >>= fun () -> - Alcotest.(check string) "Worker-1's jobs reassigned" "\ - capacity: 1\n\ - queue: (backlog) [u1:job4@11s]\n\ - registered:\n\ - \ worker-2 (0): []\n\ - clients: u1(2)+17s\n\ - cached: a: [worker-1; worker-2], b: [worker-2], c: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Alcotest.(check pop_result) "Worker 2 / job 3" (Ok "job3") (job_state w2c); + (* Worker-1's jobs reassigned *) + print_pool pool; + [%expect{| + capacity: 1 + queue: (backlog) [u1:job4@11s] + registered: + worker-2 (0): [] + clients: u1(2)+17s + cached: a: [worker-1; worker-2], b: [worker-2], c: [worker-2] + |}]; + (* Worker 2 / job 3 *) + print_result (job_state w2c); + [%expect{| job3 |}]; let w2d = Pool.pop w2 in - Alcotest.(check pop_result) "Worker 2 / job 4" (Ok "job4") (job_state w2d); + (* Worker 2 / job 4 *) + print_result (job_state w2d); + [%expect{| job4 |}]; Fake_time.advance 20; Pool.release w2; - Alcotest.(check string) "Idle" "\ - capacity: 0\n\ - queue: (backlog) []\n\ - registered:\n\ - clients: u1(2)\n\ - cached: a: [worker-1; worker-2], b: [worker-2], c: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* Idle *) + print_pool pool; + [%expect {| + capacity: 0 + queue: (backlog) [] + registered: + clients: u1(2) + cached: a: [worker-1; worker-2], b: [worker-2], c: [worker-2] + |}]; Lwt.return_unit (* Bias assignment towards workers that have things cached, but not so much that it takes longer. *) -let unbalanced () = +let%expect_test "unbalanced" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"unbalanced" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in @@ -180,21 +242,34 @@ let unbalanced () = let _ = Pool.pop w1 in let w2a = Pool.pop w2 in Lwt.pause () >>= fun () -> - Alcotest.(check string) "Worker-2 got jobs eventually" "\ - capacity: 2\n\ - queue: (backlog) []\n\ - registered:\n\ - \ worker-1 (12): [u1:job7@10s(2) u1:job6@9s(2) u1:job5@8s(2) u1:job4@7s(2)\n\ - \ u1:job3@6s(2) u1:job2@5s(2)]\n\ - \ worker-2 (0): []\n\ - clients: u1(2)+12s\n\ - cached: a: [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job8") (job_state w2a); + (* Worker-2 got jobs eventually *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [] + registered: + worker-1 (12): [u1:job7@10s(2) u1:job6@9s(2) u1:job5@8s(2) u1:job4@7s(2) + u1:job3@6s(2) u1:job2@5s(2)] + worker-2 (0): [] + clients: u1(2)+12s + cached: a: [worker-1; worker-2] + |}]; + (* Worker 2 / job 1 *) + print_result (job_state w2a); + [%expect{| job8 |}]; Pool.release w1; - flush_queue w2 ~expect:["job2"; "job3"; "job4"; "job5"; "job6"; "job7"] + flush_queue w2 >|= fun () -> + [%expect {| + Flush job2 + Flush job3 + Flush job4 + Flush job5 + Flush job6 + Flush job7 + |}] (* There are no workers available sometimes. *) -let no_workers () = +let%expect_test "no_workers" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"no_workers" in let user = Pool.client pool ~client_id:"u1" in @@ -205,18 +280,22 @@ let no_workers () = let _ = Pool.pop w1 in Pool.release w1; Lwt.pause () >>= fun () -> - Alcotest.(check string) "Worker-1 gone" "\ - capacity: 0\n\ - queue: (backlog) [u1:job2@10s]\n\ - registered:\n\ - clients: u1(1)+12s\n\ - cached: a: [worker-1]\n" (Fmt.to_to_string Pool.dump pool); + (* Worker-1 gone *) + print_pool pool; + [%expect{| + capacity: 0 + queue: (backlog) [u1:job2@10s] + registered: + clients: u1(1)+12s + cached: a: [worker-1] + |}]; let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; - flush_queue w1 ~expect:["job2"] + flush_queue w1 >|= fun () -> + [%expect {| Flush job2 |}] (* We remember cached locations across restarts. *) -let persist () = +let%expect_test "persist" = with_test_db @@ fun db -> begin let pool = Pool.create ~db ~name:"persist" in @@ -240,15 +319,18 @@ let persist () = let w1a = Pool.pop w1 in submit user ~urgent:false @@ job "job2" ~cache_hint:"a"; Lwt.pause () >>= fun () -> - Alcotest.(check pop_result) "Worker 1 gets the job" (Ok "job2") (job_state w1a); - Alcotest.(check pop_result) "Worker 2 doesn't" (Error "pending") (job_state w2a); + (* Worker 1 gets the job *) + print_result (job_state w1a); + [%expect{| job2 |}]; + (* Worker 2 doesn't *) + print_result (job_state w2a); + [%expect{| pending |}]; Pool.release w1; Pool.release w2; - Lwt.pause () >>= fun () -> - Lwt.return_unit + Lwt.pause () (* Urgent jobs go first. *) -let urgent () = +let%expect_test "urgent" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"urgent" in let user = Pool.client pool ~client_id:"u1" in @@ -259,27 +341,39 @@ let urgent () = let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; let w1a = Pool.pop w1 in - Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "job2") (job_state w1a); + (* Worker 1 / job 1 *) + print_result (job_state w1a); + [%expect{| job2 |}]; (* Worker 2 joins and gets job 4, due to the cache hints: *) let w2 = Pool.register pool ~name:"worker-2" ~capacity:1 |> Result.get_ok in Pool.set_active w2 true; let w2a = Pool.pop w2 in - Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job4") (job_state w2a); + (* Worker 2 / job 1 *) + print_result (job_state w2a); + [%expect{| job4 |}]; (* Worker 1 leaves. Jobs 1 and 3 get pushed back, keeping their ugency level. *) Pool.release w1; - Alcotest.(check string) "Worker-1 gone" "\ - capacity: 1\n\ - queue: (backlog) [u1:job1@0s u1:job3@12s+urgent]\n\ - registered:\n\ - \ worker-2 (0): []\n\ - clients: u1(1)+24s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* Worker-1 gone *) + print_pool pool; + [%expect{| + capacity: 1 + queue: (backlog) [u1:job1@0s u1:job3@12s+urgent] + registered: + worker-2 (0): [] + clients: u1(1)+24s + cached: a: [worker-1], b: [worker-2] + |}]; (* Urgent job 5 goes ahead of non-urgent job 1, but behind the existing urgent job 3. *) submit user ~urgent:true @@ job "job5" ~cache_hint:"b"; - flush_queue w2 ~expect:["job3"; "job5"; "job1"] + flush_queue w2 >|= fun () -> + [%expect {| + Flush job3 + Flush job5 + Flush job1 + |}] (* Urgent jobs go first on worker queues too. *) -let urgent_worker () = +let%expect_test "urgent_worker" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"urgent-worker" in let user = Pool.client pool ~client_id:"u1" in @@ -289,39 +383,56 @@ let urgent_worker () = let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; let w1a = Pool.pop w1 in - Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "job1") (job_state w1a); + (* Worker 1 / job 1 *) + print_result (job_state w1a); + [%expect{| job1 |}]; (* Worker 2 joins and gets job 3, due to the cache hints, putting job2 on worker 1's queue. *) let w2 = Pool.register pool ~name:"worker-2" ~capacity:1 |> Result.get_ok in Pool.set_active w2 true; let w2a = Pool.pop w2 in - Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job3") (job_state w2a); - Alcotest.(check string) "Worker-1 queue has job 2 queued" "\ - capacity: 2\n\ - queue: (backlog) []\n\ - registered:\n\ - \ worker-1 (2): [u1:job2@10s(2)]\n\ - \ worker-2 (0): []\n\ - clients: u1(1)+22s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* Worker 2 / job 1 *) + print_result (job_state w2a); + [%expect{| job3 |}]; + (* Worker-1 queue has job 2 queued *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [] + registered: + worker-1 (2): [u1:job2@10s(2)] + worker-2 (0): [] + clients: u1(1)+22s + cached: a: [worker-1], b: [worker-2] + |}]; Pool.Client.set_rate user 2.0; submit user ~urgent:true @@ job "job4" ~cache_hint:"a"; submit user ~urgent:false @@ job "job5" ~cache_hint:"b"; submit user ~urgent:true @@ job "job6" ~cache_hint:"a"; let w2b = Pool.pop w2 in - Alcotest.(check pop_result) "Worker 2 / job 2" (Ok "job5") (job_state w2b); - Alcotest.(check string) "Worker-1 gets job4 first" "\ - capacity: 2\n\ - queue: (backlog) []\n\ - registered:\n\ - \ worker-1 (6): [u1:job2@10s(2) u1:job6@24s(2+urgent) u1:job4@22s(2+urgent)]\n\ - \ worker-2 (0): []\n\ - clients: u1(2)+25s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - flush_queue w1 ~expect:["job4"; "job6"; "job2"] + (* Worker 2 / job 2 *) + print_result (job_state w2b); + [%expect{| job5 |}]; + (* Worker-1 gets job4 first *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [] + registered: + worker-1 (6): [u1:job2@10s(2) u1:job6@24s(2+urgent) u1:job4@22s(2+urgent)] + worker-2 (0): [] + clients: u1(2)+25s + cached: a: [worker-1], b: [worker-2] + |}]; + flush_queue w1 >|= fun () -> + [%expect {| + Flush job4 + Flush job6 + Flush job2 + |}] (* Workers can be paused and resumed. *) -let inactive () = +let%expect_test "inactive" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"inactive" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in @@ -334,45 +445,59 @@ let inactive () = Pool.set_active w2 true; let w2a = Pool.pop w2 in Lwt.pause () >>= fun () -> - Alcotest.(check string) "Both jobs assigned to Worker-1" "\ - capacity: 2\n\ - queue: (ready) [worker-2]\n\ - registered:\n\ - \ worker-1 (2): [u1:job2@10s(2)]\n\ - \ worker-2 (0): []\n\ - clients: u1(1)+12s\n\ - cached: a: [worker-1]\n" (Fmt.to_to_string Pool.dump pool); + (* Both jobs assigned to Worker-1 *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (ready) [worker-2] + registered: + worker-1 (2): [u1:job2@10s(2)] + worker-2 (0): [] + clients: u1(1)+12s + cached: a: [worker-1] + |}]; (* Deactivate worker-1. Its job is reassigned. *) Pool.set_active w1 false; - Alcotest.(check string) "Job reassigned to Worker-2" "\ - capacity: 2\n\ - queue: (ready) []\n\ - registered:\n\ - \ worker-1 (0): (inactive)\n\ - \ worker-2 (10): [u1:job2@10s(10)]\n\ - clients: u1(1)+12s\n\ - cached: a: [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* Job reassigned to Worker-2 *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (ready) [] + registered: + worker-1 (0): (inactive) + worker-2 (10): [u1:job2@10s(10)] + clients: u1(1)+12s + cached: a: [worker-1; worker-2] + |}]; Lwt.pause () >>= fun () -> - Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "job1") (job_state w1a); - Alcotest.(check pop_result) "Worker 2 / job 2" (Ok "job2") (job_state w2a); + (* Worker 1 / job 1 *) + print_result (job_state w1a); + [%expect{| job1 |}]; + (* Worker 2 / job 2 *) + print_result (job_state w2a); + [%expect{| job2 |}]; Pool.Client.set_rate user 2.0; submit user ~urgent:false @@ job "job3" ~cache_hint:"a"; (* Deactivate worker-2. *) Pool.set_active w2 false; - Alcotest.(check string) "Job unassigned" "\ - capacity: 2\n\ - queue: (backlog) [u1:job3@12s]\n\ - registered:\n\ - \ worker-1 (0): (inactive)\n\ - \ worker-2 (0): (inactive)\n\ - clients: u1(2)+13s\n\ - cached: a: [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* Job unassigned *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [u1:job3@12s] + registered: + worker-1 (0): (inactive) + worker-2 (0): (inactive) + clients: u1(2)+13s + cached: a: [worker-1; worker-2] + |}]; Pool.set_active w2 true; Pool.release w1; - flush_queue w2 ~expect:["job3"] + flush_queue w2 >|= fun () -> + [%expect {| Flush job3 |}] (* The user cancels while the item is assigned. *) -let cancel_worker_queue () = +let%expect_test "cancel_worker_queue" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"cancel_worker_queue" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in @@ -388,47 +513,69 @@ let cancel_worker_queue () = let j2 = Pool.Client.submit user ~urgent:false @@ job "job2" ~cache_hint:"a" in let j3 = Pool.Client.submit user ~urgent:false @@ job "job3" ~cache_hint:"a" in let j4 = Pool.Client.submit user ~urgent:false @@ job "job4" ~cache_hint:"b" in - Pool.Client.cancel user j4 |> Alcotest.(check (result pass reject)) "job4 cancelled" (Ok ()); - Alcotest.(check string) "Jobs assigned" "\ - capacity: 2\n\ - queue: (ready) []\n\ - registered:\n\ - \ worker-1 (4): [u1:job3@6s(2) u1:job2@5s(2)]\n\ - \ worker-2 (0): []\n\ - clients: u1(2)+12s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Pool.Client.cancel user j2 |> Alcotest.(check (result pass reject)) "job2 cancelled" (Ok ()); - Alcotest.(check string) "Job2 cancelled" "\ - capacity: 2\n\ - queue: (ready) []\n\ - registered:\n\ - \ worker-1 (2): [u1:job3@6s(2)]\n\ - \ worker-2 (0): []\n\ - clients: u1(2)+12s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* job4 cancelled *) + println "%a" pp_cancel (Pool.Client.cancel user j4); + [%expect{| cancelled |}]; + (* Jobs assigned *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (ready) [] + registered: + worker-1 (4): [u1:job3@6s(2) u1:job2@5s(2)] + worker-2 (0): [] + clients: u1(2)+12s + cached: a: [worker-1], b: [worker-2] + |}]; + (* job2 cancelled *) + println "%a" pp_cancel (Pool.Client.cancel user j2); + [%expect{| cancelled |}]; + (* Job2 cancelled *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (ready) [] + registered: + worker-1 (2): [u1:job3@6s(2)] + worker-2 (0): [] + clients: u1(2)+12s + cached: a: [worker-1], b: [worker-2] + |}]; Pool.release w2; Pool.set_active w1 false; - Alcotest.(check string) "Job3 pushed back" "\ - capacity: 1\n\ - queue: (backlog) [u1:job3@6s]\n\ - registered:\n\ - \ worker-1 (0): (inactive)\n\ - clients: u1(2)+12s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Pool.Client.cancel user j3 |> Alcotest.(check (result pass reject)) "job3 cancelled" (Ok ()); + (* Job3 pushed back *) + print_pool pool; + [%expect{| + capacity: 1 + queue: (backlog) [u1:job3@6s] + registered: + worker-1 (0): (inactive) + clients: u1(2)+12s + cached: a: [worker-1], b: [worker-2] + |}]; + (* job3 cancelled *) + println "%a" pp_cancel (Pool.Client.cancel user j3); + [%expect{| cancelled |}]; Pool.release w1; - Alcotest.(check string) "Job3 cancelled" "\ - capacity: 0\n\ - queue: (backlog) []\n\ - registered:\n\ - clients: u1(2)+12s\n\ - cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Alcotest.(check pop_result) "Finish worker-1" (Ok "job1") (job_state w1a); - Alcotest.(check pop_result) "Finish worker-2" (Error "pending") (job_state w2a); + (* Job3 cancelled *) + print_pool pool; + [%expect{| + capacity: 0 + queue: (backlog) [] + registered: + clients: u1(2)+12s + cached: a: [worker-1], b: [worker-2] + |}]; + (* Finish worker-1 *) + print_result (job_state w1a); + [%expect{| job1 |}]; + (* Finish worker-2 *) + print_result (job_state w2a); + [%expect {| pending |}]; Lwt.return_unit (* A worker is marked as inactive. Its items go back on the main queue. *) -let push_back () = +let%expect_test "push_back" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"push_back" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in @@ -444,28 +591,38 @@ let push_back () = submit user ~urgent:false @@ job "job3" ~cache_hint:"a"; Pool.set_active w2 true; Lwt.pause () >>= fun () -> - Alcotest.(check string) "Jobs assigned" "\ - capacity: 2\n\ - queue: (ready) [worker-2]\n\ - registered:\n\ - \ worker-1 (4): [u1:job3@6s(2) u1:job2@5s(2)]\n\ - \ worker-2 (0): []\n\ - clients: u1(2)+7s\n\ - cached: a: [worker-1]\n" (Fmt.to_to_string Pool.dump pool); + (* Jobs assigned *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (ready) [worker-2] + registered: + worker-1 (4): [u1:job3@6s(2) u1:job2@5s(2)] + worker-2 (0): [] + clients: u1(2)+7s + cached: a: [worker-1] + |}]; Pool.release w2; Pool.set_active w1 false; - Alcotest.(check string) "Jobs pushed back" "\ - capacity: 1\n\ - queue: (backlog) [u1:job3@6s u1:job2@5s]\n\ - registered:\n\ - \ worker-1 (0): (inactive)\n\ - clients: u1(2)+7s\n\ - cached: a: [worker-1]\n" (Fmt.to_to_string Pool.dump pool); + (* Jobs pushed back *) + print_pool pool; + [%expect{| + capacity: 1 + queue: (backlog) [u1:job3@6s u1:job2@5s] + registered: + worker-1 (0): (inactive) + clients: u1(2)+7s + cached: a: [worker-1] + |}]; Pool.set_active w1 true; - flush_queue w1 ~expect:["job2"; "job3"] + flush_queue w1 >|= fun () -> + [%expect {| + Flush job2 + Flush job3 + |}] (* Two clients share the cluster. *) -let fairness () = +let%expect_test "fairness" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"fairness" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in @@ -487,21 +644,34 @@ let fairness () = submit bob ~urgent:false @@ job "b2"; submit bob ~urgent:false @@ job "b3"; Lwt.pause () >>= fun () -> - Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "a1") (job_state w1a); - Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "a2") (job_state w2a); - Alcotest.(check string) "Bob's jobs aren't all last" "\ - capacity: 2\n\ - queue: (backlog) [bob:b3@10s alice:a3@10s bob:b2@5s bob:b1@0s]\n\ - registered:\n\ - \ worker-1 (0): []\n\ - \ worker-2 (0): []\n\ - clients: alice(2)+15s bob(2)+15s\n\ - cached: : [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* Worker 1 / job 1 *) + print_result (job_state w1a); + [%expect{| a1 |}]; + (* Worker 2 / job 1 *) + print_result (job_state w2a); + [%expect{| a2 |}]; + (* Bob's jobs aren't all last *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [bob:b3@10s alice:a3@10s bob:b2@5s bob:b1@0s] + registered: + worker-1 (0): [] + worker-2 (0): [] + clients: alice(2)+15s bob(2)+15s + cached: : [worker-1; worker-2] + |}]; Pool.release w2; - flush_queue w1 ~expect:["b1"; "b2"; "a3"; "b3"] + flush_queue w1 >|= fun () -> + [%expect {| + Flush b1 + Flush b2 + Flush a3 + Flush b3 + |}] (* Two clients with different rates share the cluster. *) -let fairness_rates () = +let%expect_test "fairness_rates" = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"fairness_rates" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in @@ -525,50 +695,28 @@ let fairness_rates () = submit bob ~urgent:false @@ job "b2"; submit bob ~urgent:false @@ job "b3"; Lwt.pause () >>= fun () -> - Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "a1") (job_state w1a); - Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "a2") (job_state w2a); - Alcotest.(check string) "Bob's jobs aren't all last" "\ - capacity: 2\n\ - queue: (backlog) [bob:b3@20s bob:b2@10s alice:a3@4s bob:b1@0s]\n\ - registered:\n\ - \ worker-1 (0): []\n\ - \ worker-2 (0): []\n\ - clients: alice(5)+6s bob(1)+30s\n\ - cached: : [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); + (* Worker 1 / job 1 *) + print_result (job_state w1a); + [%expect{| a1 |}]; + (* Worker 2 / job 1 *) + print_result (job_state w2a); + [%expect{| a2 |}]; + (* Bob's jobs aren't all last *) + print_pool pool; + [%expect{| + capacity: 2 + queue: (backlog) [bob:b3@20s bob:b2@10s alice:a3@4s bob:b1@0s] + registered: + worker-1 (0): [] + worker-2 (0): [] + clients: alice(5)+6s bob(1)+30s + cached: : [worker-1; worker-2] + |}]; Pool.release w2; - flush_queue w1 ~expect:["b1"; "a3"; "b2"; "b3"] - -let test_case name fn = - Alcotest_lwt.test_case name `Quick @@ fun _ () -> - Lwt_unix.yield () >>= fun () -> (* Ensure we're inside the Lwt mainloop. Lwt.pause behaves strangely otherwise. *) - Fake_time.now := 1.0; - fn () >>= fun () -> - Lwt.pause () >|= fun () -> - Prometheus.CollectorRegistry.(collect default) - |> Fmt.to_to_string Prometheus_app.TextFormat_0_0_4.output - |> String.split_on_char '\n' - |> List.iter (fun line -> - if Astring.String.is_prefix ~affix:"scheduler_pool_" line then ( - match Astring.String.cut ~sep:"} " line with - | None -> Fmt.failwith "Bad metrics line: %S" line - | Some (key, _) when Astring.String.is_infix ~affix:"_total{" key -> () - | Some (key, value) -> - if float_of_string value <> 0.0 then - Fmt.failwith "Non-zero metric after test: %s=%s" key value - ) - ) - -let suite = [ - test_case "scheduling" simple; - test_case "cached_scheduling" cached_scheduling; - test_case "unbalanced" unbalanced; - test_case "no_workers" no_workers; - test_case "persist" persist; - test_case "urgent" urgent; - test_case "urgent_worker" urgent_worker; - test_case "inactive" inactive; - test_case "cancel_worker_queue" cancel_worker_queue; - test_case "push_back" push_back; - test_case "fairness" fairness; - test_case "fairness_rates" fairness_rates; -] + flush_queue w1 >|= fun () -> + [%expect {| + Flush b1 + Flush a3 + Flush b2 + Flush b3 + |}] diff --git a/test/test_scheduling.mli b/test/test_scheduling.mli index b367942d..e69de29b 100644 --- a/test/test_scheduling.mli +++ b/test/test_scheduling.mli @@ -1 +0,0 @@ -val suite : unit Alcotest_lwt.test_case list