Skip to content

Commit aa4fa9d

Browse files
committed
fix: disallow multiple rpc servers
Signed-off-by: Rudi Grinberg <[email protected]> ps-id: 491f09b0-a316-43b8-8df5-a05f53fa331a
1 parent d3837e2 commit aa4fa9d

File tree

13 files changed

+167
-55
lines changed

13 files changed

+167
-55
lines changed

Diff for: bin/build_cmd.ml

+6-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ let run_build_command_poll_passive ~(common : Common.t) ~config ~request:_ :
8585
(* CR-someday aalekseyev: It would've been better to complain if [request] is
8686
non-empty, but we can't check that here because [request] is a function.*)
8787
let open Fiber.O in
88-
let rpc = Common.rpc common in
88+
let rpc =
89+
match Common.rpc common with
90+
| `Allow server -> server
91+
| `Forbid_builds ->
92+
Code_error.raise "rpc server must be allowed in passive mode" []
93+
in
8994
Scheduler.go_with_rpc_server_and_console_status_reporting ~common ~config
9095
(fun () ->
9196
Scheduler.Run.poll_passive

Diff for: bin/common.ml

+17-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type t =
3737
; build_dir : string
3838
; no_print_directory : bool
3939
; store_orig_src_dir : bool
40-
; rpc : Dune_rpc_impl.Server.t Lazy.t
40+
; rpc : [ `Allow of Dune_rpc_impl.Server.t Lazy.t | `Forbid_builds ]
4141
; default_target : Arg.Dep.t (* For build & runtest only *)
4242
; watch : Watch_mode_config.t
4343
; print_metrics : bool
@@ -75,7 +75,12 @@ let default_target t = t.default_target
7575

7676
let prefix_target t s = t.root.reach_from_root_prefix ^ s
7777

78-
let rpc t = Lazy.force t.rpc
78+
let rpc t =
79+
match t.rpc with
80+
| `Forbid_builds -> `Forbid_builds
81+
| `Allow rpc -> `Allow (Lazy.force rpc)
82+
83+
let forbid_builds t = { t with rpc = `Forbid_builds }
7984

8085
let stats t = t.stats
8186

@@ -1012,7 +1017,16 @@ let term ~default_root_is_cwd =
10121017
at_exit (fun () -> Dune_stats.close stats);
10131018
stats)
10141019
in
1015-
let rpc = lazy (Dune_rpc_impl.Server.create ~root:root.dir stats) in
1020+
let rpc =
1021+
`Allow
1022+
(lazy
1023+
(let registry =
1024+
match watch with
1025+
| Yes _ -> `Add
1026+
| No -> `Skip
1027+
in
1028+
Dune_rpc_impl.Server.create ~registry ~root:root.dir stats))
1029+
in
10161030
if store_digest_preimage then Dune_engine.Reversible_digest.enable ();
10171031
if print_metrics then (
10181032
Memo.Perf_counters.enable ();

Diff for: bin/common.mli

+9-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,15 @@ val capture_outputs : t -> bool
44

55
val root : t -> Workspace_root.t
66

7-
val rpc : t -> Dune_rpc_impl.Server.t
7+
val rpc :
8+
t
9+
-> [ `Allow of Dune_rpc_impl.Server.t
10+
(** Will run rpc if in watch mode and acquire the build lock *)
11+
| `Forbid_builds
12+
(** Promise not to build anything. For now, this isn't checked *)
13+
]
14+
15+
val forbid_builds : t -> t
816

917
val stats : t -> Dune_stats.t option
1018

Diff for: bin/import.ml

+22-2
Original file line numberDiff line numberDiff line change
@@ -139,27 +139,47 @@ module Scheduler = struct
139139
(Constant
140140
(Pp.seq message (Pp.verbatim ", waiting for filesystem changes...")))
141141

142+
let rpc server =
143+
{ Dune_engine.Rpc.run = Dune_rpc_impl.Server.run server
144+
; stop = Dune_rpc_impl.Server.stop server
145+
; ready = Dune_rpc_impl.Server.ready server
146+
}
147+
142148
let go ~(common : Common.t) ~config:dune_config f =
143149
let stats = Common.stats common in
144150
let config =
145151
let insignificant_changes = Common.insignificant_changes common in
146152
Dune_config.for_scheduler dune_config stats ~insignificant_changes
147153
~signal_watcher:`Yes
148154
in
155+
let f =
156+
match Common.rpc common with
157+
| `Allow server ->
158+
fun () -> Dune_engine.Rpc.with_background_rpc (rpc server) f
159+
| `Forbid_builds -> f
160+
in
149161
Run.go config ~on_event:(on_event dune_config) f
150162

151163
let go_with_rpc_server_and_console_status_reporting ~(common : Common.t)
152164
~config:dune_config run =
165+
let server =
166+
match Common.rpc common with
167+
| `Allow server -> rpc server
168+
| `Forbid_builds ->
169+
Code_error.raise "rpc must be enabled in polling mode" []
170+
in
153171
let stats = Common.stats common in
154172
let config =
155173
let insignificant_changes = Common.insignificant_changes common in
156174
Dune_config.for_scheduler dune_config stats ~insignificant_changes
157175
~signal_watcher:`Yes
158176
in
159177
let file_watcher = Common.file_watcher common in
160-
let rpc = Common.rpc common in
161178
let run () =
162-
Fiber.fork_and_join_unit (fun () -> Dune_rpc_impl.Server.run rpc) run
179+
let open Fiber.O in
180+
Dune_engine.Rpc.with_background_rpc server @@ fun () ->
181+
let* () = Dune_engine.Rpc.ensure_ready () in
182+
run ()
163183
in
164184
Run.go config ~file_watcher ~on_event:(on_event dune_config) run
165185
end

Diff for: bin/ocaml_merlin.ml

+3-1
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,9 @@ let term =
207207
debugging purposes only and should not be considered as a stable \
208208
output.")
209209
in
210-
let common = Common.set_print_directory common false in
210+
let common =
211+
Common.set_print_directory common false |> Common.forbid_builds
212+
in
211213
let config = Common.init common ~log_file:No_log_file in
212214
Scheduler.go ~common ~config (fun () ->
213215
match dump_config with

Diff for: src/csexp_rpc/csexp_rpc.ml

+11-5
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,6 @@ module Server = struct
227227
}
228228

229229
let create fd sockaddr ~backlog =
230-
Unix.set_nonblock fd;
231-
Unix.setsockopt fd Unix.SO_REUSEADDR true;
232-
Socket.bind fd sockaddr;
233230
Unix.listen fd backlog;
234231
let r_interrupt_accept, w_interrupt_accept = Unix.pipe ~cloexec:true () in
235232
Unix.set_nonblock r_interrupt_accept;
@@ -268,6 +265,7 @@ module Server = struct
268265
[ `Init of Unix.file_descr | `Running of Transport.t | `Closed ]
269266
; backlog : int
270267
; sockaddr : Unix.sockaddr
268+
; ready : unit Fiber.Ivar.t
271269
}
272270

273271
let create sockaddr ~backlog =
@@ -276,19 +274,27 @@ module Server = struct
276274
(Unix.domain_of_sockaddr sockaddr)
277275
Unix.SOCK_STREAM 0
278276
in
279-
{ sockaddr; backlog; state = `Init fd }
277+
Unix.set_nonblock fd;
278+
Unix.setsockopt fd Unix.SO_REUSEADDR true;
279+
match Socket.bind fd sockaddr with
280+
| exception Unix.Unix_error (EADDRINUSE, _, _) -> Error `Already_in_use
281+
| () ->
282+
Ok { sockaddr; backlog; state = `Init fd; ready = Fiber.Ivar.create () }
283+
284+
let ready t = Fiber.Ivar.read t.ready
280285

281286
let serve (t : t) =
282287
let* async = Worker.create () in
283288
match t.state with
284289
| `Closed -> Code_error.raise "already closed" []
285290
| `Running _ -> Code_error.raise "already running" []
286291
| `Init fd ->
287-
let+ transport =
292+
let* transport =
288293
Worker.task_exn async ~f:(fun () ->
289294
Transport.create fd t.sockaddr ~backlog:t.backlog)
290295
in
291296
t.state <- `Running transport;
297+
let+ () = Fiber.Ivar.fill t.ready () in
292298
let accept () =
293299
Worker.task async ~f:(fun () ->
294300
Transport.accept transport

Diff for: src/csexp_rpc/csexp_rpc.mli

+5-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ module Server : sig
4848
(** RPC Server *)
4949
type t
5050

51-
val create : Unix.sockaddr -> backlog:int -> t
51+
val create : Unix.sockaddr -> backlog:int -> (t, [ `Already_in_use ]) result
52+
53+
(** [ready t] returns a fiber that completes when clients can start connecting
54+
to the server *)
55+
val ready : t -> unit Fiber.t
5256

5357
val stop : t -> unit
5458

Diff for: src/dune_engine/dune_engine.ml

+1
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ module Report_errors_config = Report_errors_config
4747
module Compound_user_error = Compound_user_error
4848
module Reflection = Reflection
4949
module No_io = No_io
50+
module Rpc = Rpc

Diff for: src/dune_engine/rpc.ml

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
open Fiber.O
2+
3+
type server =
4+
{ run : unit Fiber.t
5+
; stop : unit Fiber.t
6+
; ready : unit Fiber.t
7+
}
8+
9+
type t =
10+
{ server : server
11+
; pool : Fiber.Pool.t
12+
}
13+
14+
let t = Fiber.Var.create ()
15+
16+
let with_background_rpc server f =
17+
let pool = Fiber.Pool.create () in
18+
Fiber.Var.set t { server; pool } (fun () ->
19+
Fiber.fork_and_join_unit
20+
(fun () -> Fiber.Pool.run pool)
21+
(fun () -> Fiber.finalize f ~finally:(fun () -> Fiber.Pool.stop pool)))
22+
23+
let ensure_ready () =
24+
let* { server; pool } = Fiber.Var.get_exn t in
25+
let* () = Fiber.Pool.task pool ~f:(fun () -> server.run) in
26+
server.ready
27+
28+
let stop () =
29+
let* { server; pool } = Fiber.Var.get_exn t in
30+
Fiber.fork_and_join_unit
31+
(fun () -> Fiber.Pool.stop pool)
32+
(fun () -> server.stop)

Diff for: src/dune_engine/rpc.mli

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
type server =
2+
{ run : unit Fiber.t
3+
; stop : unit Fiber.t
4+
; ready : unit Fiber.t
5+
}
6+
7+
val with_background_rpc : server -> (unit -> 'a Fiber.t) -> 'a Fiber.t
8+
9+
val ensure_ready : unit -> unit Fiber.t
10+
11+
val stop : unit -> unit Fiber.t

Diff for: src/dune_rpc_impl/server.ml

+42-38
Original file line numberDiff line numberDiff line change
@@ -35,39 +35,29 @@ module Config = struct
3535
type t =
3636
{ handler : Dune_rpc_server.t
3737
; pool : Fiber.Pool.t
38-
; backlog : int
3938
; root : string
4039
; where : Dune_rpc.Where.t
40+
; server : Csexp_rpc.Server.t
41+
; stats : Dune_stats.t option
42+
; registry : [ `Add | `Skip ]
4143
}
4244
end
4345

4446
module Run = struct
4547
module Registry = Dune_rpc_private.Registry
4648
module Server = Dune_rpc_server.Make (Csexp_rpc.Session)
47-
48-
type t =
49-
{ server : Csexp_rpc.Server.t
50-
; handler : Dune_rpc_server.t
51-
; pool : Fiber.Pool.t
52-
; where : Dune_rpc.Where.t
53-
; stats : Dune_stats.t option
54-
; root : string
55-
}
49+
open Config
5650

5751
let t_var : t Fiber.Var.t = Fiber.Var.create ()
5852

59-
let of_config { Config.handler; backlog; pool; root; where } stats =
60-
let () =
61-
let socket_file = Where.rpc_socket_file () in
62-
Path.mkdir_p (Path.build (Path.Build.parent_exn socket_file));
63-
at_exit (fun () -> Path.Build.unlink_no_err socket_file)
64-
in
65-
let server = Csexp_rpc.Server.create (Where.to_socket where) ~backlog in
66-
{ server; handler; stats; pool; root; where }
67-
6853
let run t =
6954
Fiber.Var.set t_var t @@ fun () ->
7055
let cleanup_registry = ref None in
56+
let with_registry f =
57+
match t.registry with
58+
| `Skip -> ()
59+
| `Add -> f ()
60+
in
7161
let run_cleanup_registry () =
7262
match !cleanup_registry with
7363
| None -> ()
@@ -86,6 +76,7 @@ module Run = struct
8676
(fun () ->
8777
let* sessions = Csexp_rpc.Server.serve t.server in
8878
let () =
79+
with_registry @@ fun () ->
8980
let (`Caller_should_write { Registry.File.path; contents }) =
9081
let registry_config =
9182
Registry.Config.create (Lazy.force Dune_util.xdg)
@@ -117,21 +108,10 @@ module Run = struct
117108
(fun () -> Fiber.Pool.run t.pool)
118109
in
119110
Fiber.finalize (with_print_errors run) ~finally:(fun () ->
120-
run_cleanup_registry ();
111+
with_registry run_cleanup_registry;
121112
Fiber.return ())
122-
123-
let stop () =
124-
let open Fiber.O in
125-
let* t = Fiber.Var.get t_var in
126-
match t with
127-
| None -> Code_error.raise "rpc not running" []
128-
| Some s ->
129-
Csexp_rpc.Server.stop s.server;
130-
Fiber.return ()
131113
end
132114

133-
let stop = Run.stop
134-
135115
type pending_build_action =
136116
| Build of Dep_conf.t list * Build_outcome.t Fiber.Ivar.t
137117

@@ -232,9 +212,16 @@ type t =
232212
; pending_build_jobs :
233213
(Dep_conf.t list * Build_outcome.t Fiber.Ivar.t) Job_queue.t
234214
; mutable clients : Clients.t
235-
; stats : Dune_stats.t option
236215
}
237216

217+
let ready (t : t) =
218+
let* () = Fiber.return () in
219+
Csexp_rpc.Server.ready t.config.server
220+
221+
let stop (t : t) =
222+
let+ () = Fiber.return () in
223+
Csexp_rpc.Server.stop t.config.server
224+
238225
let handler (t : t Fdecl.t) : 'a Dune_rpc_server.Handler.t =
239226
let on_init session (_ : Initialize.Request.t) =
240227
let t = Fdecl.get t in
@@ -339,7 +326,9 @@ let handler (t : t Fdecl.t) : 'a Dune_rpc_server.Handler.t =
339326
~f:(fun (_, entry) -> Session.Stage1.request_close entry.session))
340327
in
341328
let shutdown () =
342-
Fiber.fork_and_join_unit Dune_engine.Scheduler.shutdown Run.stop
329+
Fiber.fork_and_join_unit Dune_engine.Scheduler.shutdown (fun () ->
330+
Csexp_rpc.Server.stop t.config.server;
331+
Fiber.return ())
343332
in
344333
Fiber.fork_and_join_unit terminate_sessions shutdown
345334
in
@@ -411,24 +400,39 @@ let handler (t : t Fdecl.t) : 'a Dune_rpc_server.Handler.t =
411400
in
412401
rpc
413402

414-
let create ~root stats =
403+
let create ~registry ~root stats =
415404
let t = Fdecl.create Dyn.opaque in
416405
let pending_build_jobs = Job_queue.create () in
417406
let handler = Dune_rpc_server.make (handler t) in
418407
let pool = Fiber.Pool.create () in
419408
let where = Where.default () in
420-
let config = { Config.handler; backlog = 10; pool; root; where } in
421-
let res = { config; pending_build_jobs; clients = Clients.empty; stats } in
409+
let server =
410+
let socket_file = Where.rpc_socket_file () in
411+
Path.mkdir_p (Path.build (Path.Build.parent_exn socket_file));
412+
match Csexp_rpc.Server.create (Where.to_socket where) ~backlog:10 with
413+
| Ok s ->
414+
at_exit (fun () -> Path.Build.unlink_no_err socket_file);
415+
s
416+
| Error `Already_in_use ->
417+
User_error.raise
418+
[ Pp.textf
419+
"Dune already running in this build directory. If this is not the \
420+
case, delete %s"
421+
(Path.Build.to_string (Where.rpc_socket_file ()))
422+
]
423+
in
424+
let config = { Config.handler; pool; root; where; stats; server; registry } in
425+
let res = { config; pending_build_jobs; clients = Clients.empty } in
422426
Fdecl.set t res;
423427
res
424428

425429
let listening_address t = t.config.where
426430

427431
let run t =
428432
let* () = Fiber.return () in
429-
Run.run (Run.of_config t.config t.stats)
433+
Run.run t.config
430434

431-
let stats (t : t) = t.stats
435+
let stats (t : t) = t.config.stats
432436

433437
let pending_build_action t =
434438
Job_queue.read t.pending_build_jobs

0 commit comments

Comments
 (0)