Skip to content

Commit

Permalink
Fix reuse of client port on Linux. Implement for OSX.
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed May 19, 2017
1 parent 31f798e commit c354dfd
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 36 deletions.
15 changes: 11 additions & 4 deletions base/distributed/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,15 @@ function start_worker(out::IO, cookie::AbstractString)
init_worker(cookie)
interface = IPv4(LPROC.bind_addr)
if LPROC.bind_port == 0
(actual_port,sock) = listenany(interface, UInt16(9009))
LPROC.bind_port = actual_port
addr = Base.InetAddr(interface, 0)
sock = Base.TCPServer()
if bind(sock, addr) && Base.trylisten(sock) == 0
_addr, port = Base._sockname(sock, true)
LPROC.bind_port = port
else
close(sock)
error("no ports available")
end
else
sock = listen(interface, LPROC.bind_port)
end
Expand Down Expand Up @@ -256,9 +263,9 @@ end
function parse_connection_info(str)
m = match(r"^julia_worker:(\d+)#(.*)", str)
if m !== nothing
(m.captures[2], parse(Int16, m.captures[1]))
(m.captures[2], parse(UInt16, m.captures[1]))
else
("", Int16(-1))
("", UInt16(0))
end
end

Expand Down
43 changes: 23 additions & 20 deletions base/distributed/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -455,31 +455,34 @@ end
const client_port = Ref{Cushort}(0)

function socket_reuse_port()
s = TCPSocket()
client_host = Ref{Cuint}(0)
ccall(:jl_tcp_bind, Int32,
(Ptr{Void}, UInt16, UInt32, Cuint),
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))

# TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides
# early access to a socket fd, i.e., before a bind call.

@static if is_linux()
try
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
return s
elseif rc < 0
throw(SystemError("setsockopt() SO_REUSEPORT : "))
end
getsockname(s)
catch e
@static if is_linux() || is_apple()
s = TCPSocket(delay = false)

# Linux requires the port to be bound before setting REUSEPORT, OSX after.
is_linux() && bind_client_port(s)
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
return s
elseif rc < 0
# This is an issue only on systems with lots of client connections, hence delay the warning
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e)
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to regular socket.")

# provide a clean new socket
return TCPSocket()
end
is_apple() && bind_client_port(s)
else
return TCPSocket()
end
end

function bind_client_port(s)
err = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, UInt16, UInt32, Cuint),
s.handle, hton(client_port[]), hton(UInt32(0)), 0)
Base.uv_error("bind() failed", err)

_addr, port = Base._sockname(s, true)
client_port[] = port
return s
end

Expand Down
20 changes: 13 additions & 7 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,14 @@ mutable struct TCPSocket <: LibuvStream
return tcp
end
end
function TCPSocket()

# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call
function TCPSocket(; delay=true)
tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit)
err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}),
eventloop(), tcp.handle)
af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2

err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Void}, Ptr{Void}, Cuint),
eventloop(), tcp.handle, af_spec)
uv_error("failed to create tcp socket", err)
tcp.status = StatusInit
return tcp
Expand Down Expand Up @@ -840,16 +844,18 @@ listenany(default_port) = listenany(IPv4(UInt32(0)), default_port)
Get the IP address and the port that the given `TCPSocket` is connected to
(or bound to, in the case of `TCPServer`).
"""
function getsockname(sock::Union{TCPServer,TCPSocket})
getsockname(sock::Union{TCPServer, TCPSocket}) = _sockname(sock, isa(sock, TCPServer))

function _sockname(sock, self)
rport = Ref{Cushort}(0)
raddress = zeros(UInt8, 16)
rfamily = Ref{Cuint}(0)
r = if isa(sock, TCPServer)
ccall(:jl_tcp_getsockname, Int32,
if self
r = ccall(:jl_tcp_getsockname, Int32,
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
sock.handle, rport, raddress, rfamily)
else
ccall(:jl_tcp_getpeername, Int32,
r = ccall(:jl_tcp_getpeername, Int32,
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
sock.handle, rport, raddress, rfamily)
end
Expand Down
9 changes: 5 additions & 4 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -1231,8 +1231,8 @@ as local laptops, departmental clusters, or even the cloud. This section covers
requirements for the inbuilt `LocalManager` and `SSHManager`:
* The master process does not listen on any port. It only connects out to the workers.
* Each worker binds to only one of the local interfaces and listens on the first free port starting
from `9009`.
* Each worker binds to only one of the local interfaces and listens on an ephemeral port number
assigned by the OS.
* `LocalManager`, used by `addprocs(N)`, by default binds only to the loopback interface. This means
that workers started later on remote hosts (or by anyone with malicious intentions) are unable
to connect to the cluster. An `addprocs(4)` followed by an `addprocs(["remote_host"])` will fail.
Expand All @@ -1250,8 +1250,9 @@ requirements for the inbuilt `LocalManager` and `SSHManager`:
authenticated via public key infrastructure (PKI). Authentication credentials can be supplied
via `sshflags`, for example ```sshflags=`-e <keyfile>` ```.
Note that worker-worker connections are still plain TCP and the local security policy on the remote
cluster must allow for free connections between worker nodes, at least for ports 9009 and above.
In an all-to-all topology (the default), all workers connect to each other via plain TCP sockets.
The security policy on the cluster nodes must thus ensure free connectivity between workers for
the ephemeral port range (varies by OS).
Securing and encrypting all worker-worker traffic (via SSH) or encrypting individual messages
can be done via a custom ClusterManager.
Expand Down
37 changes: 36 additions & 1 deletion test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,41 @@ include("testenv.jl")

addprocs_with_testenv(4)

# Test that the client port is reused. SO_REUSEPORT may not be supported on
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
if is_unix()
# Run the test on all processes.
results = asyncmap(procs()) do p
remotecall_fetch(p) do
ports_lower = [] # ports of pids lower than myid()
ports_higher = [] # ports of pids higher than myid()
for w in Base.Distributed.PGRP.workers
w.id == myid() && continue
port = Base._sockname(w.r_stream, true)[2]
if (w.id == 1)
# master connects to workers
push!(ports_higher, port)
elseif w.id < myid()
push!(ports_lower, port)
elseif w.id > myid()
push!(ports_higher, port)
end
end
@assert (length(ports_lower) + length(ports_higher)) == nworkers()
for portset in [ports_lower, ports_higher]
if (length(portset) > 0) && (length(unique(portset)) != 1)
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
return 0
end
end
return myid()
end
end

# Ensure that the code has indeed been successfully executed everywhere
@test all(p -> p in results, procs())
end

id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

Expand Down Expand Up @@ -923,7 +958,7 @@ if is_unix() # aka have ssh
end
end

remotecall_fetch(plst->rmprocs(plst; waitfor=5.0), 1, new_pids)
remotecall_fetch(rmprocs, 1, new_pids)
end

print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")
Expand Down

0 comments on commit c354dfd

Please sign in to comment.