diff --git a/base/distributed/cluster.jl b/base/distributed/cluster.jl index 633dce7fef87f..de465fa0bcd89 100644 --- a/base/distributed/cluster.jl +++ b/base/distributed/cluster.jl @@ -153,8 +153,15 @@ function start_worker(out::IO, cookie::AbstractString) init_worker(cookie) interface = IPv4(LPROC.bind_addr) if LPROC.bind_port == 0 - (actual_port,sock) = listenany(interface, UInt16(9009)) - LPROC.bind_port = actual_port + addr = Base.InetAddr(interface, 0) + sock = Base.TCPServer() + if bind(sock, addr) && Base.trylisten(sock) == 0 + _addr, port = Base._sockname(sock, true) + LPROC.bind_port = port + else + close(sock) + error("no ports available") + end else sock = listen(interface, LPROC.bind_port) end @@ -256,9 +263,9 @@ end function parse_connection_info(str) m = match(r"^julia_worker:(\d+)#(.*)", str) if m !== nothing - (m.captures[2], parse(Int16, m.captures[1])) + (m.captures[2], parse(UInt16, m.captures[1])) else - ("", Int16(-1)) + ("", UInt16(0)) end end diff --git a/base/distributed/managers.jl b/base/distributed/managers.jl index 8654218f5b5a3..d7ed15a3e3115 100644 --- a/base/distributed/managers.jl +++ b/base/distributed/managers.jl @@ -455,31 +455,34 @@ end const client_port = Ref{Cushort}(0) function socket_reuse_port() - s = TCPSocket() - client_host = Ref{Cuint}(0) - ccall(:jl_tcp_bind, Int32, - (Ptr{Void}, UInt16, UInt32, Cuint), - s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : ")) - - # TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides - # early access to a socket fd, i.e., before a bind call. - - @static if is_linux() - try - rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle) - if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket - return s - elseif rc < 0 - throw(SystemError("setsockopt() SO_REUSEPORT : ")) - end - getsockname(s) - catch e + @static if is_linux() || is_apple() + s = TCPSocket(delay = false) + + # Linux requires the port to be bound before setting REUSEPORT, OSX after. + is_linux() && bind_client_port(s) + rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle) + if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket + return s + elseif rc < 0 # This is an issue only on systems with lots of client connections, hence delay the warning - nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e) + nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to regular socket.") + # provide a clean new socket return TCPSocket() end + is_apple() && bind_client_port(s) + else + return TCPSocket() end +end + +function bind_client_port(s) + err = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, UInt16, UInt32, Cuint), + s.handle, hton(client_port[]), hton(UInt32(0)), 0) + Base.uv_error("bind() failed", err) + + _addr, port = Base._sockname(s, true) + client_port[] = port return s end diff --git a/base/socket.jl b/base/socket.jl index 7ca828205c46c..763e864c38e06 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -282,10 +282,14 @@ mutable struct TCPSocket <: LibuvStream return tcp end end -function TCPSocket() + +# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call +function TCPSocket(; delay=true) tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit) - err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}), - eventloop(), tcp.handle) + af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2 + + err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Void}, Ptr{Void}, Cuint), + eventloop(), tcp.handle, af_spec) uv_error("failed to create tcp socket", err) tcp.status = StatusInit return tcp @@ -840,16 +844,18 @@ listenany(default_port) = listenany(IPv4(UInt32(0)), default_port) Get the IP address and the port that the given `TCPSocket` is connected to (or bound to, in the case of `TCPServer`). """ -function getsockname(sock::Union{TCPServer,TCPSocket}) +getsockname(sock::Union{TCPServer, TCPSocket}) = _sockname(sock, isa(sock, TCPServer)) + +function _sockname(sock, self) rport = Ref{Cushort}(0) raddress = zeros(UInt8, 16) rfamily = Ref{Cuint}(0) - r = if isa(sock, TCPServer) - ccall(:jl_tcp_getsockname, Int32, + if self + r = ccall(:jl_tcp_getsockname, Int32, (Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}), sock.handle, rport, raddress, rfamily) else - ccall(:jl_tcp_getpeername, Int32, + r = ccall(:jl_tcp_getpeername, Int32, (Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}), sock.handle, rport, raddress, rfamily) end diff --git a/doc/src/manual/parallel-computing.md b/doc/src/manual/parallel-computing.md index 0adb65bb63dbd..e9993cefdf0aa 100644 --- a/doc/src/manual/parallel-computing.md +++ b/doc/src/manual/parallel-computing.md @@ -1231,8 +1231,8 @@ as local laptops, departmental clusters, or even the cloud. This section covers requirements for the inbuilt `LocalManager` and `SSHManager`: * The master process does not listen on any port. It only connects out to the workers. - * Each worker binds to only one of the local interfaces and listens on the first free port starting - from `9009`. + * Each worker binds to only one of the local interfaces and listens on an ephemeral port number + assigned by the OS. * `LocalManager`, used by `addprocs(N)`, by default binds only to the loopback interface. This means that workers started later on remote hosts (or by anyone with malicious intentions) are unable to connect to the cluster. An `addprocs(4)` followed by an `addprocs(["remote_host"])` will fail. @@ -1250,8 +1250,9 @@ requirements for the inbuilt `LocalManager` and `SSHManager`: authenticated via public key infrastructure (PKI). Authentication credentials can be supplied via `sshflags`, for example ```sshflags=`-e ` ```. - Note that worker-worker connections are still plain TCP and the local security policy on the remote - cluster must allow for free connections between worker nodes, at least for ports 9009 and above. + In an all-to-all topology (the default), all workers connect to each other via plain TCP sockets. + The security policy on the cluster nodes must thus ensure free connectivity between workers for + the ephemeral port range (varies by OS). Securing and encrypting all worker-worker traffic (via SSH) or encrypting individual messages can be done via a custom ClusterManager. diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 818c410505a2c..9d9c309009232 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -12,6 +12,41 @@ include("testenv.jl") addprocs_with_testenv(4) +# Test that the client port is reused. SO_REUSEPORT may not be supported on +# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX +if is_unix() + # Run the test on all processes. + results = asyncmap(procs()) do p + remotecall_fetch(p) do + ports_lower = [] # ports of pids lower than myid() + ports_higher = [] # ports of pids higher than myid() + for w in Base.Distributed.PGRP.workers + w.id == myid() && continue + port = Base._sockname(w.r_stream, true)[2] + if (w.id == 1) + # master connects to workers + push!(ports_higher, port) + elseif w.id < myid() + push!(ports_lower, port) + elseif w.id > myid() + push!(ports_higher, port) + end + end + @assert (length(ports_lower) + length(ports_higher)) == nworkers() + for portset in [ports_lower, ports_higher] + if (length(portset) > 0) && (length(unique(portset)) != 1) + warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?") + return 0 + end + end + return myid() + end + end + + # Ensure that the code has indeed been successfully executed everywhere + @test all(p -> p in results, procs()) +end + id_me = myid() id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] @@ -923,7 +958,7 @@ if is_unix() # aka have ssh end end - remotecall_fetch(plst->rmprocs(plst; waitfor=5.0), 1, new_pids) + remotecall_fetch(rmprocs, 1, new_pids) end print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")