Skip to content

Commit b6000d9

Browse files
committed
reuse port if possible
1 parent 0a9ee7a commit b6000d9

File tree

3 files changed

+84
-29
lines changed

3 files changed

+84
-29
lines changed

base/managers.jl

+24-2
Original file line numberDiff line numberDiff line change
@@ -293,13 +293,35 @@ function connect_w2w(pid::Int, config::WorkerConfig)
293293
(s,s)
294294
end
295295

296+
const client_port = Ref{Cushort}(0)
297+
298+
function socket_reuse_port()
299+
s = TCPSocket()
300+
try
301+
client_host = Ref{Cuint}(0)
302+
ccall(:jl_tcp_bind, Int32,
303+
(Ptr{Void}, UInt16, UInt32, Cuint),
304+
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw("bind() error")
305+
306+
ccall(:jl_tcp_reuseport, Int32, (Ptr{Void}, ), s.handle) < 0 && throw("SO_REUSEPORT error")
307+
ccall(:jl_tcp_getsockname_v4, Int32,
308+
(Ptr{Void}, Ref{Cuint}, Ref{Cushort}),
309+
s.handle, client_host, client_port) < 0 && throw("getsockname() error")
310+
catch e
311+
warn_once("Unable to reuse port : ", e)
312+
# provide a clean new socket
313+
return TCPSocket()
314+
end
315+
return s
316+
end
296317

297318
function connect_to_worker(host::AbstractString, port::Integer)
298319
# Connect to the loopback port if requested host has the same ipaddress as self.
320+
s = socket_reuse_port()
299321
if host == string(LPROC.bind_addr)
300-
s = connect("127.0.0.1", UInt16(port))
322+
s = connect(s, "127.0.0.1", UInt16(port))
301323
else
302-
s = connect(host, UInt16(port))
324+
s = connect(s, host, UInt16(port))
303325
end
304326

305327
# Avoid calling getaddrinfo if possible - involves a DNS lookup

base/multi.jl

+31-27
Original file line numberDiff line numberDiff line change
@@ -778,15 +778,6 @@ notify_empty(rv::RemoteValue) = notify(rv.empty)
778778

779779
## message event handlers ##
780780

781-
# activity on accept fd
782-
function accept_handler(server::TCPServer, status::Int32)
783-
if status == -1
784-
error("an error occured during the creation of the server")
785-
end
786-
client = accept_nonblock(server)
787-
process_messages(client, client)
788-
end
789-
790781
process_messages(r_stream::TCPSocket, w_stream::TCPSocket) = process_messages(r_stream, w_stream, nothing)
791782
process_messages(r_stream::TCPSocket, w_stream::TCPSocket, rr_ntfy_join) = @schedule process_tcp_streams(r_stream, w_stream, rr_ntfy_join)
792783

@@ -903,29 +894,32 @@ function message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream, rr_n
903894
end # end of while
904895
catch e
905896
iderr = worker_id_from_socket(r_stream)
906-
werr = worker_from_id(iderr)
907-
oldstate = werr.state
908-
set_worker_state(werr, W_TERMINATED)
909-
910-
911-
# If error occured talking to pid 1, commit harakiri
912-
if iderr == 1
913-
if isopen(w_stream)
914-
print(STDERR, "fatal error on ", myid(), ": ")
915-
display_error(e, catch_backtrace())
897+
if (iderr < 1)
898+
print(STDERR, "Socket from unknown remote worker in worker ", myid())
899+
else
900+
werr = worker_from_id(iderr)
901+
oldstate = werr.state
902+
set_worker_state(werr, W_TERMINATED)
903+
904+
# If error occured talking to pid 1, commit harakiri
905+
if iderr == 1
906+
if isopen(w_stream)
907+
print(STDERR, "fatal error on ", myid(), ": ")
908+
display_error(e, catch_backtrace())
909+
end
910+
exit(1)
916911
end
917-
exit(1)
918-
end
919912

920-
# Will treat any exception as death of node and cleanup
921-
# since currently we do not have a mechanism for workers to reconnect
922-
# to each other on unhandled errors
923-
deregister_worker(iderr)
913+
# Will treat any exception as death of node and cleanup
914+
# since currently we do not have a mechanism for workers to reconnect
915+
# to each other on unhandled errors
916+
deregister_worker(iderr)
917+
end
924918

925919
if isopen(r_stream) close(r_stream) end
926920
if isopen(w_stream) close(w_stream) end
927921

928-
if (myid() == 1)
922+
if (myid() == 1) && (iderr > 1)
929923
if oldstate != W_TERMINATING
930924
println(STDERR, "Worker $iderr terminated.")
931925
rethrow(e)
@@ -977,7 +971,17 @@ function start_worker(out::IO)
977971
else
978972
sock = listen(LPROC.bind_port)
979973
end
980-
sock.ccb = accept_handler
974+
@schedule begin
975+
while true
976+
try
977+
client = accept(sock)
978+
process_messages(client, client)
979+
catch e
980+
println(STDERR, "Error in accept() : ", e)
981+
break
982+
end
983+
end
984+
end
981985
print(out, "julia_worker:") # print header
982986
print(out, "$(dec(LPROC.bind_port))#") # print port
983987
print(out, LPROC.bind_addr)

src/jl_uv.c

+29
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,36 @@ DLLEXPORT int jl_tcp_quickack(uv_tcp_t *handle, int on)
655655
}
656656
return 0;
657657
}
658+
659+
#endif
660+
661+
662+
DLLEXPORT int jl_tcp_reuseport(uv_tcp_t *handle)
663+
{
664+
#if defined(SO_REUSEPORT)
665+
int fd = (handle)->io_watcher.fd;
666+
int yes = 1;
667+
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) {
668+
return -1;
669+
}
670+
return 0;
671+
#else
672+
return -1;
658673
#endif
674+
}
675+
676+
DLLEXPORT int jl_tcp_getsockname_v4(uv_tcp_t *handle, uint32_t * ip, uint16_t * port)
677+
{
678+
struct sockaddr_in name;
679+
int len = sizeof(name);
680+
if (uv_tcp_getsockname(handle, (struct sockaddr *)&name, &len)) {
681+
return -1;
682+
}
683+
684+
*ip = ntohl(name.sin_addr.s_addr);
685+
*port = ntohs(name.sin_port);
686+
return 0;
687+
}
659688

660689
#ifndef _OS_WINDOWS_
661690

0 commit comments

Comments
 (0)