Skip to content

Commit 8f4e33f

Browse files
committed
Limit binding to one interface only. Implement cluster cookie. (#16292)
1 parent 38bdc6e commit 8f4e33f

File tree

15 files changed

+147
-72
lines changed

15 files changed

+147
-72
lines changed

base/client.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ function process_options(opts::JLOptions)
217217
startup && load_juliarc()
218218

219219
# startup worker
220-
if opts.worker != 0
221-
start_worker() # does not return
220+
if opts.worker != C_NULL
221+
start_worker(bytestring(opts.worker)) # does not return
222222
end
223223
# add processors
224224
if opts.nprocs > 0

base/docs/helpdb/Base.jl

+7
Original file line numberDiff line numberDiff line change
@@ -10474,3 +10474,10 @@ to. This is useful when writing custom `serialize` methods for a type, which opt
1047410474
data written out depending on the receiving process id.
1047510475
"""
1047610476
Base.worker_id_from_socket
10477+
10478+
"""
10479+
Base.cluster_cookie([cookie]) -> cookie
10480+
10481+
Returns the cluster cookie. If a cookie is passed, also sets it as the cluster cookie.
10482+
"""
10483+
Base.cluster_cookie

base/initdefs.jl

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ function init_parallel()
7373
global PGRP
7474
global LPROC
7575
LPROC.id = 1
76+
cluster_cookie(randstring())
7677
assert(isempty(PGRP.workers))
7778
register_worker(LPROC)
7879
end

base/managers.jl

+15-29
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ end
3535

3636

3737
function check_addprocs_args(kwargs)
38+
valid_kw_names = collect(keys(default_addprocs_params()))
3839
for keyname in kwargs
39-
!(keyname[1] in [:dir, :exename, :exeflags, :topology]) && throw(ArgumentError("Invalid keyword argument $(keyname[1])"))
40+
!(keyname[1] in valid_kw_names) && throw(ArgumentError("Invalid keyword argument $(keyname[1])"))
4041
end
4142
end
4243

@@ -93,7 +94,7 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
9394
if length(machine_bind) > 1
9495
exeflags = `--bind-to $(machine_bind[2]) $exeflags`
9596
end
96-
exeflags = `$exeflags --worker`
97+
exeflags = `$exeflags --worker $(cluster_cookie())`
9798

9899
machine_def = split(machine_bind[1], ':')
99100
# if this machine def has a port number, add the port information to the ssh flags
@@ -217,15 +218,15 @@ end
217218

218219

219220
# LocalManager
220-
221221
immutable LocalManager <: ClusterManager
222222
np::Integer
223+
restrict::Bool # Restrict binding to 127.0.0.1 only
223224
end
224225

225226
addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...)
226-
function addprocs(np::Integer; kwargs...)
227+
function addprocs(np::Integer; restrict=true, kwargs...)
227228
check_addprocs_args(kwargs)
228-
addprocs(LocalManager(np); kwargs...)
229+
addprocs(LocalManager(np, restrict); kwargs...)
229230
end
230231

231232
show(io::IO, manager::LocalManager) = println(io, "LocalManager()")
@@ -234,10 +235,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
234235
dir = params[:dir]
235236
exename = params[:exename]
236237
exeflags = params[:exeflags]
238+
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
237239

238240
for i in 1:manager.np
239241
io, pobj = open(pipeline(detach(
240-
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $(LPROC.bind_addr) --worker`, dir=dir)),
242+
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`, dir=dir)),
241243
stderr=STDERR), "r")
242244
wconfig = WorkerConfig()
243245
wconfig.process = pobj
@@ -331,15 +333,7 @@ function connect_w2w(pid::Int, config::WorkerConfig)
331333
(rhost, rport) = get(config.connect_at)
332334
config.host = rhost
333335
config.port = rport
334-
if get(get(config.environ), :self_is_local, false) && get(get(config.environ), :r_is_local, false)
335-
# If on localhost, use the loopback address - this addresses
336-
# the special case of system suspend wherein the local ip
337-
# may be changed upon system awake.
338-
(s, bind_addr) = connect_to_worker("127.0.0.1", rport)
339-
else
340-
(s, bind_addr)= connect_to_worker(rhost, rport)
341-
end
342-
336+
(s, bind_addr) = connect_to_worker(rhost, rport)
343337
(s,s)
344338
end
345339

@@ -375,24 +369,16 @@ function socket_reuse_port()
375369
end
376370

377371
function connect_to_worker(host::AbstractString, port::Integer)
378-
# Connect to the loopback port if requested host has the same ipaddress as self.
379372
s = socket_reuse_port()
380-
if host == string(LPROC.bind_addr)
381-
s = connect(s, "127.0.0.1", UInt16(port))
382-
else
383-
s = connect(s, host, UInt16(port))
384-
end
373+
connect(s, host, UInt16(port))
385374

386375
# Avoid calling getaddrinfo if possible - involves a DNS lookup
387376
# host may be a stringified ipv4 / ipv6 address or a dns name
388-
if host == "localhost"
389-
bind_addr = "127.0.0.1"
390-
else
391-
try
392-
bind_addr = string(parse(IPAddr,host))
393-
catch
394-
bind_addr = string(getaddrinfo(host))
395-
end
377+
bind_addr = nothing
378+
try
379+
bind_addr = string(parse(IPAddr,host))
380+
catch
381+
bind_addr = string(getaddrinfo(host))
396382
end
397383
(s, bind_addr)
398384
end

base/multi.jl

+45-22
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,24 @@ end
6161
# Worker initialization messages
6262
type IdentifySocketMsg <: AbstractMsg
6363
from_pid::Int
64+
cookie::AbstractString
65+
end
66+
type IdentifySocketAckMsg <: AbstractMsg
67+
cookie::AbstractString
6468
end
6569
type JoinPGRPMsg <: AbstractMsg
6670
self_pid::Int
6771
other_workers::Array
68-
self_is_local::Bool
6972
notify_oid::RRID
7073
topology::Symbol
7174
worker_pool
75+
cookie::AbstractString
7276
end
7377
type JoinCompleteMsg <: AbstractMsg
7478
notify_oid::RRID
7579
cpu_cores::Int
7680
ospid::Int
81+
cookie::AbstractString
7782
end
7883

7984

@@ -270,11 +275,15 @@ type LocalProcess
270275
id::Int
271276
bind_addr::AbstractString
272277
bind_port::UInt16
278+
cookie::AbstractString
273279
LocalProcess() = new(1)
274280
end
275281

276282
const LPROC = LocalProcess()
277283

284+
cluster_cookie() = LPROC.cookie
285+
cluster_cookie(cookie) = (LPROC.cookie = cookie; cookie)
286+
278287
const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}()
279288
const map_sock_wrkr = ObjectIdDict()
280289
const map_del_wrkr = Set{Int}()
@@ -962,19 +971,25 @@ end
962971
process_messages(r_stream::IO, w_stream::IO) = @schedule message_handler_loop(r_stream, w_stream)
963972

964973
function message_handler_loop(r_stream::IO, w_stream::IO)
965-
global PGRP
966-
global cluster_manager
967-
968974
try
975+
# Check for a valid first message with a cookie.
976+
msg = deserialize(r_stream)
977+
if !any(x->isa(msg, x), [JoinPGRPMsg, JoinCompleteMsg, IdentifySocketMsg, IdentifySocketAckMsg]) ||
978+
(msg.cookie != cluster_cookie())
979+
980+
println(STDERR, "Unknown first message $(typeof(msg)) or cookie mismatch.")
981+
error("Invalid connection credentials.")
982+
end
983+
969984
while true
985+
handle_msg(msg, r_stream, w_stream)
970986
msg = deserialize(r_stream)
971987
# println("got msg: ", msg)
972-
handle_msg(msg, r_stream, w_stream)
973988
end
974989
catch e
975990
iderr = worker_id_from_socket(r_stream)
976991
if (iderr < 1)
977-
print(STDERR, "Socket from unknown remote worker in worker ", myid())
992+
println(STDERR, "Socket from unknown remote worker in worker $(myid())")
978993
else
979994
werr = worker_from_id(iderr)
980995
oldstate = werr.state
@@ -995,8 +1010,8 @@ function message_handler_loop(r_stream::IO, w_stream::IO)
9951010
deregister_worker(iderr)
9961011
end
9971012

998-
if isopen(r_stream) close(r_stream) end
999-
if isopen(w_stream) close(w_stream) end
1013+
isopen(r_stream) && close(r_stream)
1014+
isopen(w_stream) && close(w_stream)
10001015

10011016
if (myid() == 1) && (iderr > 1)
10021017
if oldstate != W_TERMINATING
@@ -1028,7 +1043,12 @@ handle_msg(msg::RemoteDoMsg, r_stream, w_stream) = @schedule run_work_thunk(()->
10281043

10291044
handle_msg(msg::ResultMsg, r_stream, w_stream) = put!(lookup_ref(msg.response_oid), msg.value)
10301045

1031-
handle_msg(msg::IdentifySocketMsg, r_stream, w_stream) = Worker(msg.from_pid, r_stream, w_stream, cluster_manager)
1046+
function handle_msg(msg::IdentifySocketMsg, r_stream, w_stream)
1047+
# register a new peer worker connection
1048+
w=Worker(msg.from_pid, r_stream, w_stream, cluster_manager)
1049+
send_msg_now(w, IdentifySocketAckMsg(cluster_cookie()))
1050+
end
1051+
handle_msg(msg::IdentifySocketAckMsg, r_stream, w_stream) = nothing
10321052

10331053
function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream)
10341054
LPROC.id = msg.self_pid
@@ -1037,10 +1057,9 @@ function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream)
10371057
topology(msg.topology)
10381058

10391059
wait_tasks = Task[]
1040-
for (connect_at, rpid, r_is_local) in msg.other_workers
1060+
for (connect_at, rpid) in msg.other_workers
10411061
wconfig = WorkerConfig()
10421062
wconfig.connect_at = connect_at
1043-
wconfig.environ = AnyDict(:self_is_local=>msg.self_is_local, :r_is_local=>r_is_local)
10441063

10451064
let rpid=rpid, wconfig=wconfig
10461065
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
@@ -1052,24 +1071,24 @@ function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream)
10521071

10531072
set_default_worker_pool(msg.worker_pool)
10541073

1055-
send_msg_now(controller, JoinCompleteMsg(msg.notify_oid, Sys.CPU_CORES, getpid()))
1074+
send_msg_now(controller, JoinCompleteMsg(msg.notify_oid, Sys.CPU_CORES, getpid(), cluster_cookie()))
10561075
end
10571076

10581077
function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
10591078
try
10601079
(r_s, w_s) = connect(manager, rpid, wconfig)
10611080
w = Worker(rpid, r_s, w_s, manager, wconfig)
10621081
process_messages(w.r_stream, w.w_stream)
1063-
send_msg_now(w, IdentifySocketMsg(myid()))
1082+
send_msg_now(w, IdentifySocketMsg(myid(), cluster_cookie()))
10641083
catch e
1084+
display_error(e, catch_backtrace())
10651085
println(STDERR, "Error [$e] on $(myid()) while connecting to peer $rpid. Exiting.")
10661086
exit(1)
10671087
end
10681088
end
10691089

10701090
function handle_msg(msg::JoinCompleteMsg, r_stream, w_stream)
10711091
w = map_sock_wrkr[r_stream]
1072-
10731092
environ = get(w.config.environ, Dict())
10741093
environ[:cpu_cores] = msg.cpu_cores
10751094
w.config.environ = environ
@@ -1093,8 +1112,8 @@ worker_timeout() = parse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "60.0"))
10931112
# The entry point for julia worker processes. does not return. Used for TCP transport.
10941113
# Cluster managers implementing their own transport will provide their own.
10951114
# Argument is descriptor to write listening port # to.
1096-
start_worker() = start_worker(STDOUT)
1097-
function start_worker(out::IO)
1115+
start_worker(cookie::AbstractString) = start_worker(STDOUT, cookie)
1116+
function start_worker(out::IO, cookie::AbstractString)
10981117
# we only explicitly monitor worker STDOUT on the console, so redirect
10991118
# stderr to stdout so we can see the output.
11001119
# at some point we might want some or all worker output to go to log
@@ -1103,12 +1122,13 @@ function start_worker(out::IO)
11031122
# exit when process 1 shut down. Don't yet know why.
11041123
#redirect_stderr(STDOUT)
11051124

1106-
init_worker()
1125+
init_worker(cookie)
1126+
interface = IPv4(LPROC.bind_addr)
11071127
if LPROC.bind_port == 0
1108-
(actual_port,sock) = listenany(UInt16(9009))
1128+
(actual_port,sock) = listenany(interface, UInt16(9009))
11091129
LPROC.bind_port = actual_port
11101130
else
1111-
sock = listen(LPROC.bind_port)
1131+
sock = listen(interface, LPROC.bind_port)
11121132
end
11131133
@schedule while isopen(sock)
11141134
client = accept(sock)
@@ -1180,7 +1200,7 @@ function parse_connection_info(str)
11801200
end
11811201
end
11821202

1183-
function init_worker(manager::ClusterManager=DefaultClusterManager())
1203+
function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
11841204
# On workers, the default cluster manager connects via TCP sockets. Custom
11851205
# transports will need to call this function with their own manager.
11861206
global cluster_manager
@@ -1195,6 +1215,9 @@ function init_worker(manager::ClusterManager=DefaultClusterManager())
11951215
# System is started in head node mode, cleanup entries related to the same
11961216
empty!(PGRP.workers)
11971217
empty!(map_pid_wrkr)
1218+
1219+
cluster_cookie(cookie)
1220+
nothing
11981221
end
11991222

12001223

@@ -1391,8 +1414,8 @@ function create_worker(manager, wconfig)
13911414
end
13921415
end
13931416

1394-
all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), join_list)
1395-
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology, default_worker_pool()))
1417+
all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list)
1418+
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, ntfy_oid, PGRP.topology, default_worker_pool(), cluster_cookie()))
13961419

13971420
@schedule manage(w.manager, w.id, w.config, :register)
13981421
wait(rr_ntfy_join)

base/options.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ immutable JLOptions
2525
depwarn::Int8
2626
can_inline::Int8
2727
fast_math::Int8
28-
worker::Int8
28+
worker::Ptr{UInt8}
2929
handle_signals::Int8
3030
use_precompiled::Int8
3131
use_compilecache::Int8

base/socket.jl

+3-2
Original file line numberDiff line numberDiff line change
@@ -743,8 +743,8 @@ end
743743

744744
## Utility functions
745745

746-
function listenany(default_port)
747-
addr = InetAddr(IPv4(UInt32(0)),default_port)
746+
function listenany(host::IPAddr, default_port)
747+
addr = InetAddr(host, default_port)
748748
while true
749749
sock = TCPServer()
750750
if bind(sock,addr) && _listen(sock) == 0
@@ -757,6 +757,7 @@ function listenany(default_port)
757757
end
758758
end
759759
end
760+
listenany(default_port) = listenany(IPv4(UInt32(0)),default_port)
760761

761762
function getsockname(sock::Union{TCPServer,TCPSocket})
762763
rport = Ref{Cushort}(0)

0 commit comments

Comments
 (0)