Skip to content

Commit a8536c3

Browse files
committed
Merge pull request #12158 from JuliaLang/amitm/slurm1000
reuse client port number if possible
2 parents a8e9189 + 40c622b commit a8536c3

File tree

3 files changed

+73
-19
lines changed

3 files changed

+73
-19
lines changed

base/managers.jl

+24-2
Original file line numberDiff line numberDiff line change
@@ -293,13 +293,35 @@ function connect_w2w(pid::Int, config::WorkerConfig)
293293
(s,s)
294294
end
295295

296+
const client_port = Ref{Cushort}(0)
297+
298+
function socket_reuse_port()
299+
s = TCPSocket()
300+
try
301+
client_host = Ref{Cuint}(0)
302+
ccall(:jl_tcp_bind, Int32,
303+
(Ptr{Void}, UInt16, UInt32, Cuint),
304+
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))
305+
306+
ccall(:jl_tcp_reuseport, Int32, (Ptr{Void}, ), s.handle) < 0 && throw(SystemError("setsockopt() SO_REUSEPORT : "))
307+
ccall(:jl_tcp_getsockname_v4, Int32,
308+
(Ptr{Void}, Ref{Cuint}, Ref{Cushort}),
309+
s.handle, client_host, client_port) < 0 && throw(SystemError("getsockname() : "))
310+
catch e
311+
warn_once("Unable to reuse port : ", e)
312+
# provide a clean new socket
313+
return TCPSocket()
314+
end
315+
return s
316+
end
296317

297318
function connect_to_worker(host::AbstractString, port::Integer)
298319
# Connect to the loopback port if requested host has the same ipaddress as self.
320+
s = socket_reuse_port()
299321
if host == string(LPROC.bind_addr)
300-
s = connect("127.0.0.1", UInt16(port))
322+
s = connect(s, "127.0.0.1", UInt16(port))
301323
else
302-
s = connect(host, UInt16(port))
324+
s = connect(s, host, UInt16(port))
303325
end
304326

305327
# Avoid calling getaddrinfo if possible - involves a DNS lookup

base/multi.jl

+20-17
Original file line numberDiff line numberDiff line change
@@ -894,29 +894,32 @@ function message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream, rr_n
894894
end # end of while
895895
catch e
896896
iderr = worker_id_from_socket(r_stream)
897-
werr = worker_from_id(iderr)
898-
oldstate = werr.state
899-
set_worker_state(werr, W_TERMINATED)
900-
901-
902-
# If error occured talking to pid 1, commit harakiri
903-
if iderr == 1
904-
if isopen(w_stream)
905-
print(STDERR, "fatal error on ", myid(), ": ")
906-
display_error(e, catch_backtrace())
897+
if (iderr < 1)
898+
print(STDERR, "Socket from unknown remote worker in worker ", myid())
899+
else
900+
werr = worker_from_id(iderr)
901+
oldstate = werr.state
902+
set_worker_state(werr, W_TERMINATED)
903+
904+
# If error occured talking to pid 1, commit harakiri
905+
if iderr == 1
906+
if isopen(w_stream)
907+
print(STDERR, "fatal error on ", myid(), ": ")
908+
display_error(e, catch_backtrace())
909+
end
910+
exit(1)
907911
end
908-
exit(1)
909-
end
910912

911-
# Will treat any exception as death of node and cleanup
912-
# since currently we do not have a mechanism for workers to reconnect
913-
# to each other on unhandled errors
914-
deregister_worker(iderr)
913+
# Will treat any exception as death of node and cleanup
914+
# since currently we do not have a mechanism for workers to reconnect
915+
# to each other on unhandled errors
916+
deregister_worker(iderr)
917+
end
915918

916919
if isopen(r_stream) close(r_stream) end
917920
if isopen(w_stream) close(w_stream) end
918921

919-
if (myid() == 1)
922+
if (myid() == 1) && (iderr > 1)
920923
if oldstate != W_TERMINATING
921924
println(STDERR, "Worker $iderr terminated.")
922925
rethrow(e)

src/jl_uv.c

+29
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,36 @@ DLLEXPORT int jl_tcp_quickack(uv_tcp_t *handle, int on)
658658
}
659659
return 0;
660660
}
661+
662+
#endif
663+
664+
665+
DLLEXPORT int jl_tcp_reuseport(uv_tcp_t *handle)
666+
{
667+
#if defined(SO_REUSEPORT)
668+
int fd = (handle)->io_watcher.fd;
669+
int yes = 1;
670+
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) {
671+
return -1;
672+
}
673+
return 0;
674+
#else
675+
return -1;
661676
#endif
677+
}
678+
679+
DLLEXPORT int jl_tcp_getsockname_v4(uv_tcp_t *handle, uint32_t * ip, uint16_t * port)
680+
{
681+
struct sockaddr_in name;
682+
int len = sizeof(name);
683+
if (uv_tcp_getsockname(handle, (struct sockaddr *)&name, &len)) {
684+
return -1;
685+
}
686+
687+
*ip = ntohl(name.sin_addr.s_addr);
688+
*port = ntohs(name.sin_port);
689+
return 0;
690+
}
662691

663692
#ifndef _OS_WINDOWS_
664693

0 commit comments

Comments
 (0)