Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions api/registration.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ let local ~register =
match worker with
| None -> Service.fail "Missing worker argument!"
| Some worker ->
let response, results = Service.Response.create Results.init_pointer in
let queue = register ~name ~capacity worker in
Results.queue_set results (Some queue);
Capability.dec_ref queue;
Service.return response
match register ~name ~capacity worker with
| Error `Name_taken -> Service.fail "Worker already registered!"
| Ok queue ->
let response, results = Service.Response.create Results.init_pointer in
Results.queue_set results (Some queue);
Capability.dec_ref queue;
Service.return response
end

module X = Raw.Client.Registration
Expand Down
9 changes: 6 additions & 3 deletions scheduler/cluster_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,24 @@ module Pool_api = struct

let register t ~name ~capacity worker =
match Pool.register t.pool ~name ~capacity with
| Error `Name_taken ->
Fmt.failwith "Worker already registered!";
| Error `Name_taken as e ->
Log.warn (fun f -> f "Worker %S already registered!" name);
e
| Ok q ->
Pool.set_active q true;
Log.info (fun f -> f "Registered new worker %S" name);
Hashtbl.add t.workers name worker;
Lwt_condition.broadcast t.cond ();
Cluster_api.Queue.local
let queue = Cluster_api.Queue.local
~pop:(pop q)
~set_active:(Pool.set_active q)
~release:(fun () ->
Hashtbl.remove t.workers name;
Capability.dec_ref worker;
Pool.release q
)
in
Ok queue

let registration_service t =
let register = register t in
Expand Down