Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "obuilder"]
path = obuilder
url = https://github.com/ocurrent/obuilder.git
[submodule "solver-service"]
path = solver-service
url = https://github.com/patricoferris/solver-service
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ At the moment, two build types are provided: building a Dockerfile, or building
In either case, the build may done in the context of some Git commit.
The scheduler tries to schedule similar builds on the same machine, to benefit from caching.

The scheduler also supports submitting custom jobs. These jobs are defined by the user and require cluster workers that can handle them. The Docker [example](./examples/docker_pipeline.ml) shows how solver jobs can be submitted to the cluster as well as a Docker job (which could also be an OBuilder job).

## Contents

<!-- vim-markdown-toc GFM -->
Expand Down Expand Up @@ -130,7 +132,7 @@ ocluster-admin --connect ./capnp-secrets/admin.cap add-client test-user > submis
```

There is a command-line client, and a plugin for use in [OCurrent](https://github.com/ocurrent/ocurrent) pipelines.
See [obuilder_pipeline.ml](./examples/obuilder_pipeline.ml) for an example pipeline using the plugin.
See [docker_pipeline.ml](./examples/docker_pipeline.ml) for an example pipeline using the plugin.

You might want to create an alias for the admin and submission clients, e.g.

Expand Down
21 changes: 21 additions & 0 deletions api/custom.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
type payload = Raw.Reader.pointer_t

type t = {
kind : string;
payload : payload;
}

let v ~kind payload = { kind; payload }

let kind t = t.kind
let payload t = t.payload

let read (action : Raw.Reader.Custom.t) =
let payload = Raw.Reader.Custom.payload_get action in
let kind = Raw.Reader.Custom.kind_get action in
{ kind; payload }

let init b { kind; payload } =
Raw.Builder.Custom.kind_set b kind;
let _ : Raw.Builder.pointer_t = Raw.Builder.Custom.payload_set_reader b payload in
()
20 changes: 20 additions & 0 deletions api/custom.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type t
(** Custom job specifications *)

type payload = Raw.Reader.pointer_t
(** Raw, untyped payload *)

val v : kind:string -> payload -> t
(** [v ~kind payload] is a custom job specification *)

val kind : t -> string
(** A string describing the kind of custom job *)

val payload : t -> Raw.Reader.pointer_t
(** The dynamic payload of the custom job *)

val init : Raw.Builder.Custom.t -> t -> unit
(** [init builder t] initialises a fresh builder with the values from [t]. *)

val read : Raw.Reader.Custom.t -> t
(** [read c] reads the buffer and returns a custom job specification. *)
9 changes: 9 additions & 0 deletions api/schema.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ struct OBuilder {
# The contents of the OBuilder spec to build.
}

struct Custom {
kind @0 :Text;
# A name describing the kind of custom job

payload @1 :AnyPointer;
# A custom job with a dynamic payload
}

struct Secret {
id @0 :Text;
# The secret id.
Expand All @@ -53,6 +61,7 @@ struct JobDescr {
action :union {
dockerBuild @0 :DockerBuild;
obuilder @4 :OBuilder;
custom @6 :Custom;
}

cacheHint @1 :Text;
Expand Down
7 changes: 7 additions & 0 deletions api/submission.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@ type t = X.t Capability.t
type action =
| Docker_build of Docker.Spec.t
| Obuilder_build of Obuilder_job.Spec.t
| Custom_build of Custom.t

let docker_build ?push_to ?(options=Docker.Spec.defaults) dockerfile =
Docker_build { Docker.Spec.dockerfile; options; push_to }

let obuilder_build spec =
Obuilder_build { Obuilder_job.Spec.spec = `Contents spec }

let custom_build c = Custom_build c

let get_action descr =
let module JD = Raw.Reader.JobDescr in
match JD.action_get descr |> JD.Action.get with
| DockerBuild action -> Docker_build (Docker.Spec.read action)
| Obuilder action -> Obuilder_build (Obuilder_job.Spec.read action)
| Custom action -> Custom_build (Custom.read action)
| Undefined x -> Fmt.failwith "Unknown action type %d" x

let submit ?src ?(urgent=false) ?(secrets=[]) t ~pool ~action ~cache_hint =
Expand All @@ -55,6 +59,9 @@ let submit ?src ?(urgent=false) ?(secrets=[]) t ~pool ~action ~cache_hint =
| Obuilder_build action ->
let b = JD.Action.obuilder_init act in
Obuilder_job.Spec.init b action
| Custom_build action ->
let b = JD.Action.custom_init act in
Custom.init b action
end;
JD.cache_hint_set b cache_hint;
src |> Option.iter (fun (repo, commits) ->
Expand Down
2 changes: 1 addition & 1 deletion bin/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(public_names ocluster-scheduler ocluster-client ocluster-worker ocluster-admin)
(package ocluster)
(names scheduler client worker admin)
(libraries dune-build-info ocluster-api logs.cli logs.fmt fmt.cli fmt.tty capnp-rpc-unix cluster_scheduler cluster_worker prometheus-app.unix db
(libraries dune-build-info ocluster-api logs.cli logs.fmt fmt.cli fmt.tty capnp-rpc-unix cluster_scheduler cluster_worker prometheus-app.unix db solver-service
(select winsvc_wrapper.ml from
(winsvc -> winsvc_wrapper.winsvc.ml)
( -> winsvc_wrapper..ml))))
Expand Down
83 changes: 83 additions & 0 deletions bin/solver.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
(* Workers that can also solve opam jobs *)

open Lwt.Syntax

let solve_to_custom req =
let open Cluster_api.Raw in
let params =
Yojson.Safe.to_string
@@ Solver_service_api.Worker.Solve_request.to_yojson req
in
let custom = Builder.Custom.init_root () in
let builder = Builder.Custom.payload_get custom in
let request =
Solver_service_api.Raw.Builder.Solver.Solve.Params.init_pointer builder
in
Solver_service_api.Raw.Builder.Solver.Solve.Params.request_set request params;
let r = Reader.Custom.of_builder custom in
Reader.Custom.payload_get r

let solve_of_custom c =
let open Solver_service_api.Raw in
let payload = Cluster_api.Custom.payload c in
let request =
Reader.Solver.Solve.Params.request_get @@ Reader.of_pointer payload
in
Solver_service_api.Worker.Solve_request.of_yojson
@@ Yojson.Safe.from_string request

module Service = Solver_service.Service.Make (Solver_service.Opam_repository)

let cluster_worker_log log =
let module L = Solver_service_api.Raw.Service.Log in
L.local
@@ object
inherit L.service

method write_impl params release_param_caps =
let open L.Write in
release_param_caps ();
let msg = Params.msg_get params in
Cluster_worker.Log_data.write log msg;
Capnp_rpc_lwt.Service.(return (Response.create_empty ()))
end

let solve ~solver ~switch:_ ~log c =
match solve_of_custom c with
| Error m -> failwith m
| Ok s ->
let+ response =
Solver_service_api.Solver.solve ~log:(cluster_worker_log log) solver s
in
let response = Yojson.Safe.to_string @@ Solver_service_api.Worker.Solve_response.to_yojson response in
Cluster_worker.Log_data.write log response;
Ok response

let spawn_local ?solver_dir () : Solver_service_api.Solver.t =
Logs.info (fun f -> f "Setting up solver...");
let p, c = Unix.(socketpair PF_UNIX SOCK_STREAM 0 ~cloexec:true) in
Unix.clear_close_on_exec c;
let solver_dir = match solver_dir with None -> "solver" | Some x -> x in
let cmd = ("", [| "solver-service" |]) in
let _child =
Lwt_process.open_process_none ~cwd:solver_dir ~stdin:(`FD_move c) cmd
in
let switch = Lwt_switch.create () in
let p =
Lwt_unix.of_unix_file_descr p
|> Capnp_rpc_unix.Unix_flow.connect ~switch
|> Capnp_rpc_net.Endpoint.of_flow
(module Capnp_rpc_unix.Unix_flow)
~peer_id:Capnp_rpc_net.Auth.Digest.insecure ~switch
in
let conn =
Capnp_rpc_unix.CapTP.connect ~restore:Capnp_rpc_net.Restorer.none p
in
let solver =
Capnp_rpc_unix.CapTP.bootstrap conn
(Capnp_rpc_net.Restorer.Id.public "solver")
in
solver
|> Capnp_rpc_lwt.Capability.when_broken (fun ex ->
Fmt.failwith "Solver process failed: %a" Capnp_rpc.Exception.pp ex);
solver
16 changes: 16 additions & 0 deletions bin/solver.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
val solve_to_custom : Solver_service_api.Worker.Solve_request.t -> Cluster_api.Custom.payload
(** [solve_to_custom req] converts the solver request to a custom job specification. *)

val solve_of_custom : Cluster_api.Custom.t -> (Solver_service_api.Worker.Solve_request.t, string) result
(** [solve_of_custom c] tries to read the custom job specifcation as a solver request. *)

val solve :
solver:Solver_service_api.Solver.X.t Capnp_rpc_lwt.Capability.t ->
switch:'a ->
log:Cluster_worker.Log_data.t ->
Cluster_api.Custom.t ->
(string, 'b) result Lwt.t
(** [solve ~solver ~switch ~log c] interprets [c] as a solver request and solves it using [solver]. *)

val spawn_local : ?solver_dir:string -> unit -> Solver_service_api.Solver.t
(** [spawn_local ()] forks a process running a [solver-service] that communicates over standard input/output. *)
9 changes: 8 additions & 1 deletion bin/worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ let update_docker () =
let update_normal () =
Lwt.return (fun () -> Lwt.return ())

(* We extend the default build function to support solver jobs *)
let build ~solver ~switch ~log ~src ~secrets = function
| `Custom c -> Solver.solve ~solver ~switch ~log c
| _ as job_desc ->
Cluster_worker.default_build ~switch ~log ~src ~secrets job_desc

let main default_level ?formatter registration_path capacity name allow_push prune_threshold state_dir obuilder =
setup_log ?formatter default_level;
let update =
Expand All @@ -50,7 +56,8 @@ let main default_level ?formatter registration_path capacity name allow_push pru
Lwt_main.run begin
let vat = Capnp_rpc_unix.client_only_vat () in
let sr = Capnp_rpc_unix.Cap_file.load vat registration_path |> or_die in
Cluster_worker.run ~capacity ~name ~allow_push ?prune_threshold ?obuilder ~state_dir ~update sr
let solver = Solver.spawn_local ~solver_dir:state_dir () in
Cluster_worker.run ~build:(build ~solver) ~capacity ~name ~allow_push ?prune_threshold ?obuilder ~state_dir ~update sr
end

(* Command-line parsing *)
Expand Down
99 changes: 99 additions & 0 deletions examples/docker_pipeline.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
let program_name = "docker_pipeline"

open Current.Syntax

let () =
Logging.init ()

let opam_repository = { Current_github.Repo_id.owner = "ocaml"; name = "opam-repository" }
let repo = { Current_github.Repo_id.owner = "ocurrent"; name = "obuilder" }
let pool = "linux-x86_64"
let timeout = Duration.of_min 60

let docker_spec deps =
Fmt.str {|FROM busybox
COPY . /src
ENV DEPS %s
RUN ls -l /src
RUN echo $DEPS|} deps

let pipeline ~cluster vars () =
let src =
let+ src = Current_github.Api.Anonymous.head_of repo (`Ref "refs/heads/master") in
[ src ]
in
let packages = [
"obuilder-spec.dev", Fpath.v "obuilder-spec.opam";
"obuilder.dev", Fpath.v "obuilder.opam"
] in
let opamfiles = Solve.get_opamfile ~packages src in
let cluster = Current_ocluster.with_timeout (Some timeout) cluster in
let cache_hint = "ocluster-example" in
let request =
let+ opamfiles = opamfiles
and* opam_repo = Current_github.Api.Anonymous.head_of opam_repository (`Ref "refs/heads/master") in
let payload =
Solve.solve_to_custom Solver_service_api.Worker.Solve_request.{
opam_repository_commit = Current_git.Commit_id.hash opam_repo;
root_pkgs = opamfiles;
pinned_pkgs = [];
platforms = ["os", vars];
}
in
Cluster_api.Custom.v ~kind:"solve" payload
in
let spec =
let+ response = Current_ocluster.custom ~label:"solver" cluster ~src ~pool request in
match Solver_service_api.Worker.Solve_response.of_yojson (Yojson.Safe.from_string response) with
| Ok response -> (
match response with
| Ok (selection :: _) ->
let packages = selection.packages in
docker_spec (String.concat " " packages)
| Ok [] -> failwith "No packages found"
| Error (`Msg m) -> failwith m
)
| Error m -> failwith m
in
let spec = `Contents spec in
let options = Cluster_api.Docker.Spec.{
build_args = [];
squash = false;
buildkit = false;
include_git = true;
} in
Current_ocluster.build cluster ~cache_hint ~src ~pool ~options spec

let main config mode submission_uri =
let vat = Capnp_rpc_unix.client_only_vat () in
let vars = Lwt_main.run @@ Solve.get_vars ~ocaml_package_name:"obuilder" ~ocaml_version:"4.13.1" () in
let submission_cap = Capnp_rpc_unix.Vat.import_exn vat submission_uri in
let connection = Current_ocluster.Connection.create submission_cap in
let cluster = Current_ocluster.v connection in
let engine = Current.Engine.create ~config (pipeline ~cluster vars) in
let site = Current_web.Site.(v ~has_role:allow_all) ~name:program_name (Current_web.routes engine) in
Lwt_main.run begin
Lwt.choose [
Current.Engine.thread engine;
Current_web.run ~mode site;
]
end

(* Command-line parsing *)

open Cmdliner

let submission_service =
Arg.required @@
Arg.opt Arg.(some Capnp_rpc_unix.sturdy_uri) None @@
Arg.info
~doc:"The submission.cap file for the build scheduler service"
~docv:"FILE"
["submission-service"]

let cmd =
let doc = "Run a custom solver job and a Docker build on a cluster." in
Term.(term_result (const main $ Current.Config.cmdliner $ Current_web.cmdliner $ submission_service)),
Term.info program_name ~doc

let () = Term.(exit @@ eval cmd)
1 change: 1 addition & 0 deletions examples/docker_pipeline.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(** An OCurrent pipeline that submits custom solver jobs and builds a Dockerfile using the result of the solve. *)
4 changes: 2 additions & 2 deletions examples/dune
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
(executable
(name obuilder_pipeline)
(name docker_pipeline)
(libraries
obuilder-spec
current
current_github
current_ocluster
current_web
capnp-rpc-unix
solver-service-api
logs.fmt
fmt.tty))
Loading