Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict interface bindings and test connections #16292

Merged
merged 1 commit into from
May 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ function process_options(opts::JLOptions)
startup && load_juliarc()

# startup worker
if opts.worker != 0
start_worker() # does not return
if opts.worker != C_NULL
start_worker(bytestring(opts.worker)) # does not return
end
# add processors
if opts.nprocs > 0
Expand Down
7 changes: 7 additions & 0 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10485,3 +10485,10 @@ to. This is useful when writing custom `serialize` methods for a type, which opt
data written out depending on the receiving process id.
"""
Base.worker_id_from_socket

"""
Base.cluster_cookie([cookie]) -> cookie

Returns the cluster cookie. If a cookie is passed, also sets it as the cluster cookie.
"""
Base.cluster_cookie
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to do this inline rather than adding to this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had it inline but moved it after removing the export since we cannot have the function definition as Base.cluster_cookie.

1 change: 1 addition & 0 deletions base/initdefs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ function init_parallel()
global PGRP
global LPROC
LPROC.id = 1
cluster_cookie(randstring())
assert(isempty(PGRP.workers))
register_worker(LPROC)
end
Expand Down
44 changes: 15 additions & 29 deletions base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ end


function check_addprocs_args(kwargs)
valid_kw_names = collect(keys(default_addprocs_params()))
for keyname in kwargs
!(keyname[1] in [:dir, :exename, :exeflags, :topology]) && throw(ArgumentError("Invalid keyword argument $(keyname[1])"))
!(keyname[1] in valid_kw_names) && throw(ArgumentError("Invalid keyword argument $(keyname[1])"))
end
end

Expand Down Expand Up @@ -93,7 +94,7 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
if length(machine_bind) > 1
exeflags = `--bind-to $(machine_bind[2]) $exeflags`
end
exeflags = `$exeflags --worker`
exeflags = `$exeflags --worker $(cluster_cookie())`

machine_def = split(machine_bind[1], ':')
# if this machine def has a port number, add the port information to the ssh flags
Expand Down Expand Up @@ -217,15 +218,15 @@ end


# LocalManager

immutable LocalManager <: ClusterManager
np::Integer
restrict::Bool # Restrict binding to 127.0.0.1 only
end

addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...)
function addprocs(np::Integer; kwargs...)
function addprocs(np::Integer; restrict=true, kwargs...)
check_addprocs_args(kwargs)
addprocs(LocalManager(np); kwargs...)
addprocs(LocalManager(np, restrict); kwargs...)
end

show(io::IO, manager::LocalManager) = println(io, "LocalManager()")
Expand All @@ -234,10 +235,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`

for i in 1:manager.np
io, pobj = open(pipeline(detach(
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $(LPROC.bind_addr) --worker`, dir=dir)),
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`, dir=dir)),
stderr=STDERR), "r")
wconfig = WorkerConfig()
wconfig.process = pobj
Expand Down Expand Up @@ -331,15 +333,7 @@ function connect_w2w(pid::Int, config::WorkerConfig)
(rhost, rport) = get(config.connect_at)
config.host = rhost
config.port = rport
if get(get(config.environ), :self_is_local, false) && get(get(config.environ), :r_is_local, false)
# If on localhost, use the loopback address - this addresses
# the special case of system suspend wherein the local ip
# may be changed upon system awake.
(s, bind_addr) = connect_to_worker("127.0.0.1", rport)
else
(s, bind_addr)= connect_to_worker(rhost, rport)
end

(s, bind_addr) = connect_to_worker(rhost, rport)
(s,s)
end

Expand Down Expand Up @@ -375,24 +369,16 @@ function socket_reuse_port()
end

function connect_to_worker(host::AbstractString, port::Integer)
# Connect to the loopback port if requested host has the same ipaddress as self.
s = socket_reuse_port()
if host == string(LPROC.bind_addr)
s = connect(s, "127.0.0.1", UInt16(port))
else
s = connect(s, host, UInt16(port))
end
connect(s, host, UInt16(port))

# Avoid calling getaddrinfo if possible - involves a DNS lookup
# host may be a stringified ipv4 / ipv6 address or a dns name
if host == "localhost"
bind_addr = "127.0.0.1"
else
try
bind_addr = string(parse(IPAddr,host))
catch
bind_addr = string(getaddrinfo(host))
end
bind_addr = nothing
try
bind_addr = string(parse(IPAddr,host))
catch
bind_addr = string(getaddrinfo(host))
end
(s, bind_addr)
end
Expand Down
67 changes: 45 additions & 22 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,24 @@ end
# Worker initialization messages
type IdentifySocketMsg <: AbstractMsg
from_pid::Int
cookie::AbstractString
end
type IdentifySocketAckMsg <: AbstractMsg
cookie::AbstractString
end
type JoinPGRPMsg <: AbstractMsg
self_pid::Int
other_workers::Array
self_is_local::Bool
notify_oid::RRID
topology::Symbol
worker_pool
cookie::AbstractString
end
type JoinCompleteMsg <: AbstractMsg
notify_oid::RRID
cpu_cores::Int
ospid::Int
cookie::AbstractString
end


Expand Down Expand Up @@ -270,11 +275,15 @@ type LocalProcess
id::Int
bind_addr::AbstractString
bind_port::UInt16
cookie::AbstractString
LocalProcess() = new(1)
end

const LPROC = LocalProcess()

cluster_cookie() = LPROC.cookie
cluster_cookie(cookie) = (LPROC.cookie = cookie; cookie)

const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}()
const map_sock_wrkr = ObjectIdDict()
const map_del_wrkr = Set{Int}()
Expand Down Expand Up @@ -962,19 +971,25 @@ end
process_messages(r_stream::IO, w_stream::IO) = @schedule message_handler_loop(r_stream, w_stream)

function message_handler_loop(r_stream::IO, w_stream::IO)
global PGRP
global cluster_manager

try
# Check for a valid first message with a cookie.
msg = deserialize(r_stream)
if !any(x->isa(msg, x), [JoinPGRPMsg, JoinCompleteMsg, IdentifySocketMsg, IdentifySocketAckMsg]) ||
(msg.cookie != cluster_cookie())

println(STDERR, "Unknown first message $(typeof(msg)) or cookie mismatch.")
error("Invalid connection credentials.")
end

while true
handle_msg(msg, r_stream, w_stream)
msg = deserialize(r_stream)
# println("got msg: ", msg)
handle_msg(msg, r_stream, w_stream)
end
catch e
iderr = worker_id_from_socket(r_stream)
if (iderr < 1)
print(STDERR, "Socket from unknown remote worker in worker ", myid())
println(STDERR, "Socket from unknown remote worker in worker $(myid())")
else
werr = worker_from_id(iderr)
oldstate = werr.state
Expand All @@ -995,8 +1010,8 @@ function message_handler_loop(r_stream::IO, w_stream::IO)
deregister_worker(iderr)
end

if isopen(r_stream) close(r_stream) end
if isopen(w_stream) close(w_stream) end
isopen(r_stream) && close(r_stream)
isopen(w_stream) && close(w_stream)

if (myid() == 1) && (iderr > 1)
if oldstate != W_TERMINATING
Expand Down Expand Up @@ -1028,7 +1043,12 @@ handle_msg(msg::RemoteDoMsg, r_stream, w_stream) = @schedule run_work_thunk(()->

handle_msg(msg::ResultMsg, r_stream, w_stream) = put!(lookup_ref(msg.response_oid), msg.value)

handle_msg(msg::IdentifySocketMsg, r_stream, w_stream) = Worker(msg.from_pid, r_stream, w_stream, cluster_manager)
function handle_msg(msg::IdentifySocketMsg, r_stream, w_stream)
# register a new peer worker connection
w=Worker(msg.from_pid, r_stream, w_stream, cluster_manager)
send_msg_now(w, IdentifySocketAckMsg(cluster_cookie()))
end
handle_msg(msg::IdentifySocketAckMsg, r_stream, w_stream) = nothing

function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream)
LPROC.id = msg.self_pid
Expand All @@ -1037,10 +1057,9 @@ function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream)
topology(msg.topology)

wait_tasks = Task[]
for (connect_at, rpid, r_is_local) in msg.other_workers
for (connect_at, rpid) in msg.other_workers
wconfig = WorkerConfig()
wconfig.connect_at = connect_at
wconfig.environ = AnyDict(:self_is_local=>msg.self_is_local, :r_is_local=>r_is_local)

let rpid=rpid, wconfig=wconfig
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
Expand All @@ -1052,24 +1071,24 @@ function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream)

set_default_worker_pool(msg.worker_pool)

send_msg_now(controller, JoinCompleteMsg(msg.notify_oid, Sys.CPU_CORES, getpid()))
send_msg_now(controller, JoinCompleteMsg(msg.notify_oid, Sys.CPU_CORES, getpid(), cluster_cookie()))
end

function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
try
(r_s, w_s) = connect(manager, rpid, wconfig)
w = Worker(rpid, r_s, w_s, manager, wconfig)
process_messages(w.r_stream, w.w_stream)
send_msg_now(w, IdentifySocketMsg(myid()))
send_msg_now(w, IdentifySocketMsg(myid(), cluster_cookie()))
catch e
display_error(e, catch_backtrace())
println(STDERR, "Error [$e] on $(myid()) while connecting to peer $rpid. Exiting.")
exit(1)
end
end

function handle_msg(msg::JoinCompleteMsg, r_stream, w_stream)
w = map_sock_wrkr[r_stream]

environ = get(w.config.environ, Dict())
environ[:cpu_cores] = msg.cpu_cores
w.config.environ = environ
Expand All @@ -1093,8 +1112,8 @@ worker_timeout() = parse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "60.0"))
# The entry point for julia worker processes. does not return. Used for TCP transport.
# Cluster managers implementing their own transport will provide their own.
# Argument is descriptor to write listening port # to.
start_worker() = start_worker(STDOUT)
function start_worker(out::IO)
start_worker(cookie::AbstractString) = start_worker(STDOUT, cookie)
function start_worker(out::IO, cookie::AbstractString)
# we only explicitly monitor worker STDOUT on the console, so redirect
# stderr to stdout so we can see the output.
# at some point we might want some or all worker output to go to log
Expand All @@ -1103,12 +1122,13 @@ function start_worker(out::IO)
# exit when process 1 shut down. Don't yet know why.
#redirect_stderr(STDOUT)

init_worker()
init_worker(cookie)
interface = IPv4(LPROC.bind_addr)
if LPROC.bind_port == 0
(actual_port,sock) = listenany(UInt16(9009))
(actual_port,sock) = listenany(interface, UInt16(9009))
LPROC.bind_port = actual_port
else
sock = listen(LPROC.bind_port)
sock = listen(interface, LPROC.bind_port)
end
@schedule while isopen(sock)
client = accept(sock)
Expand Down Expand Up @@ -1180,7 +1200,7 @@ function parse_connection_info(str)
end
end

function init_worker(manager::ClusterManager=DefaultClusterManager())
function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
# On workers, the default cluster manager connects via TCP sockets. Custom
# transports will need to call this function with their own manager.
global cluster_manager
Expand All @@ -1195,6 +1215,9 @@ function init_worker(manager::ClusterManager=DefaultClusterManager())
# System is started in head node mode, cleanup entries related to the same
empty!(PGRP.workers)
empty!(map_pid_wrkr)

cluster_cookie(cookie)
nothing
end


Expand Down Expand Up @@ -1391,8 +1414,8 @@ function create_worker(manager, wconfig)
end
end

all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), join_list)
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology, default_worker_pool()))
all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list)
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, ntfy_oid, PGRP.topology, default_worker_pool(), cluster_cookie()))

@schedule manage(w.manager, w.id, w.config, :register)
wait(rr_ntfy_join)
Expand Down
2 changes: 1 addition & 1 deletion base/options.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ immutable JLOptions
depwarn::Int8
can_inline::Int8
fast_math::Int8
worker::Int8
worker::Ptr{UInt8}
handle_signals::Int8
use_precompiled::Int8
use_compilecache::Int8
Expand Down
5 changes: 3 additions & 2 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -743,8 +743,8 @@ end

## Utility functions

function listenany(default_port)
addr = InetAddr(IPv4(UInt32(0)),default_port)
function listenany(host::IPAddr, default_port)
addr = InetAddr(host, default_port)
while true
sock = TCPServer()
if bind(sock,addr) && _listen(sock) == 0
Expand All @@ -757,6 +757,7 @@ function listenany(default_port)
end
end
end
listenany(default_port) = listenany(IPv4(UInt32(0)),default_port)

function getsockname(sock::Union{TCPServer,TCPSocket})
rport = Ref{Cushort}(0)
Expand Down
Loading