Skip to content

Commit

Permalink
support lazy all_to_all connection setups
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 18, 2017
1 parent 8a18928 commit 9b30e01
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 23 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ This section lists changes that do not have deprecation warnings.
* The `Diagonal` type definition has changed from `Diagonal{T}` to
`Diagonal{T,V<:AbstractVector{T}}` ([#22718]).

* Worker-worker connections are setup lazily for a `:all_to_all` topology. Use keyword
arg `lazy=false` to force all connections to be setup during a `addprocs` call. ([#22814])

Library improvements
--------------------

Expand Down
96 changes: 77 additions & 19 deletions base/distributed/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ mutable struct Worker
state::WorkerState
c_state::Condition # wait for state changes
ct_time::Float64 # creation time
conn_func::Nullable{Function} # Used to setup connections lazily

r_stream::IO
w_stream::IO
Expand All @@ -82,12 +83,13 @@ mutable struct Worker
w
end

function Worker(id::Int)
Worker(id::Int) = Worker(id, Nullable{Function}())
function Worker(id::Int, conn_func)
@assert id > 0
if haskey(map_pid_wrkr, id)
return map_pid_wrkr[id]
end
w=new(id, [], [], false, W_CREATED, Condition(), time())
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
register_worker(w)
w
end
Expand All @@ -102,21 +104,56 @@ end

function check_worker_state(w::Worker)
if w.state == W_CREATED
if PGRP.topology == :all_to_all
# Since higher pids connect with lower pids, the remote worker
# may not have connected to us yet. Wait for some time.
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

@schedule (sleep(timeout); notify(w.c_state; all=true))
wait(w.c_state)
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
if !isclusterlazy()
if PGRP.topology == :all_to_all
# Since higher pids connect with lower pids, the remote worker
# may not have connected to us yet. Wait for some time.
wait_for_conn(w)
else
error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology))
end
else
error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology))
w.ct_time = time()
if myid() > w.id
@schedule exec_conn_func(w)
else
# route request via node 1
@schedule remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
wait_for_conn(w)
end
end
end

exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id))
function exec_conn_func(w::Worker)
if isnull(w.conn_func)
return wait_for_conn(w) # Some other task may be trying to connect at the same time.
end

try
f = get(w.conn_func)
w.conn_func = Nullable{Function}()
f()
catch e
w.conn_func = () -> throw(e)
rethrow(e)
end
nothing
end

function wait_for_conn(w)
if w.state == W_CREATED
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

@schedule (sleep(timeout); notify(w.c_state; all=true))
wait(w.c_state)
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
nothing
end

## process group creation ##

mutable struct LocalProcess
Expand Down Expand Up @@ -340,6 +377,17 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
params = merge(default_addprocs_params(), AnyDict(kwargs))
topology(Symbol(params[:topology]))

if PGRP.topology != :all_to_all
params[:lazy] = false
end

if isnull(PGRP.lazy) || nprocs() == 1
PGRP.lazy = Nullable{Bool}(params[:lazy])
elseif isclusterlazy() != params[:lazy]
throw(ErrorException(string("Active workers with lazy=", isclusterlazy(),
". Cannot set lazy=", params[:lazy])))
end

# References to launched workers, filled when each worker is fully initialized and
# has connected to all nodes.
launched_q = Int[] # Asynchronously filled by the launch method
Expand Down Expand Up @@ -396,7 +444,8 @@ default_addprocs_params() = AnyDict(
:dir => pwd(),
:exename => joinpath(JULIA_HOME, julia_exename()),
:exeflags => ``,
:enable_threaded_blas => false)
:enable_threaded_blas => false,
:lazy => true)


function setup_launched_worker(manager, wconfig, launched_q)
Expand Down Expand Up @@ -517,7 +566,7 @@ function create_worker(manager, wconfig)

all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list)
send_connection_hdr(w, true)
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false))
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false), isclusterlazy())
send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)

@schedule manage(w.manager, w.id, w.config, :register)
Expand Down Expand Up @@ -619,8 +668,9 @@ mutable struct ProcessGroup
workers::Array{Any,1}
refs::Dict # global references
topology::Symbol
lazy::Nullable{Bool}

ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all)
ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, Nullable{Bool}())
end
const PGRP = ProcessGroup([])

Expand All @@ -634,6 +684,14 @@ function topology(t)
t
end

function isclusterlazy()
if isnull(PGRP.lazy)
return false
else
return get(PGRP.lazy)
end
end

get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid))
get_bind_addr(w::LocalProcess) = LPROC.bind_addr
function get_bind_addr(w::Worker)
Expand Down Expand Up @@ -667,7 +725,7 @@ myid() = LPROC.id
Get the number of available processes.
"""
function nprocs()
if myid() == 1 || PGRP.topology == :all_to_all
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
n = length(PGRP.workers)
# filter out workers in the process of being setup/shutdown.
for jw in PGRP.workers
Expand Down Expand Up @@ -698,7 +756,7 @@ end
Returns a list of all process identifiers.
"""
function procs()
if myid() == 1 || PGRP.topology == :all_to_all
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
# filter out workers in the process of being setup/shutdown.
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)]
else
Expand All @@ -707,7 +765,7 @@ function procs()
end

function id_in_procs(id) # faster version of `id in procs()`
if myid() == 1 || PGRP.topology == :all_to_all
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
for x in PGRP.workers
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state == W_CONNECTED)
return true
Expand Down Expand Up @@ -903,7 +961,7 @@ function deregister_worker(pg, pid)
if myid() == 1 && isdefined(w, :config)
# Notify the cluster manager of this workers death
manage(w.manager, w.id, w.config, :deregister)
if PGRP.topology != :all_to_all
if PGRP.topology != :all_to_all || isclusterlazy()
for rpid in workers()
try
remote_do(deregister_worker, rpid, pid)
Expand Down
6 changes: 5 additions & 1 deletion base/distributed/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ Keyword arguments:
A worker with a cluster manager identity `ident` will connect to all workers specified
in `connect_idents`.
* `lazy`: Applicable only with `topology=:all_to_all`. If `true`, worker-worker connections
are setup lazily, i.e., they are setup at the first instance of a remote call between
workers. Default is true.
Environment variables :
Expand Down Expand Up @@ -302,7 +306,7 @@ addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...)
Launches workers using the in-built `LocalManager` which only launches workers on the
local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4
processes on the local machine. If `restrict` is `true`, binding is restricted to
`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, and
`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, `lazy` and
`enable_threaded_blas` have the same effect as documented for `addprocs(machines)`.
"""
function addprocs(np::Integer; restrict=true, kwargs...)
Expand Down
1 change: 1 addition & 0 deletions base/distributed/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct JoinPGRPMsg <: AbstractMsg
other_workers::Array
topology::Symbol
enable_threaded_blas::Bool
lazy::Bool
end
struct JoinCompleteMsg <: AbstractMsg
cpu_cores::Int
Expand Down
12 changes: 10 additions & 2 deletions base/distributed/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,22 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
disable_threaded_libs()
end

lazy = msg.lazy
PGRP.lazy = Nullable{Bool}(lazy)

wait_tasks = Task[]
for (connect_at, rpid) in msg.other_workers
wconfig = WorkerConfig()
wconfig.connect_at = connect_at

let rpid=rpid, wconfig=wconfig
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
push!(wait_tasks, t)
if lazy
# The constructor registers the object with a global registry.
Worker(rpid, Nullable{Function}(()->connect_to_peer(cluster_manager, rpid, wconfig)))
else
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
push!(wait_tasks, t)
end
end
end

Expand Down
6 changes: 6 additions & 0 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,12 @@ connected to each other:
fields `ident` and `connect_idents` in `WorkerConfig`. A worker with a cluster-manager-provided
identity `ident` will connect to all workers specified in `connect_idents`.

Keyword argument `lazy=true|false` only affects `topology` option `:all_to_all`. If `true`, the cluster
starts off with the master connected to all workers. Specific worker-worker connections are established
at the first remote invocation between two workers. This helps in reducing initial resources allocated for
intra-cluster communication. Connections are setup depending on the runtime requirements of a parallel
program. Default value for `lazy` is `true`.

Currently, sending a message between unconnected workers results in an error. This behaviour,
as with the functionality and interface, should be considered experimental in nature and may change
in future releases.
Expand Down
2 changes: 1 addition & 1 deletion test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ include("testenv.jl")
1
end

addprocs_with_testenv(4)
addprocs_with_testenv(4; lazy=false)
@test nprocs() == 5

function reuseport_tests()
Expand Down
42 changes: 42 additions & 0 deletions test/topology.jl
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,45 @@ for p1 in workers()
end

remove_workers_and_test()

# test `lazy` connection setup
function def_count_conn()
@everywhere function count_connected_workers()
count(x -> isa(x, Base.Distributed.Worker) && isdefined(x, :r_stream) && isopen(x.r_stream),
Base.Distributed.PGRP.workers)
end
end

addprocs_with_testenv(8)
def_count_conn()

# Test for 10 random combinations
wl = workers()
combinations = []
while length(combinations) < 10
from = rand(wl)
to = rand(wl)
if from == to || ((from,to) in combinations) || ((to,from) in combinations)
continue
else
push!(combinations, (from,to))
end
end

# Initially only master-slave connections ought to be setup
expected_num_conns = 8
num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers()))
@test num_conns == expected_num_conns

for (i, (from,to)) in enumerate(combinations)
remotecall_wait(topid->remotecall_fetch(myid, topid), from, to)
expected_num_conns += 2 # one connection endpoint on both from and to
num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers()))
@test num_conns == expected_num_conns
end

# With lazy=false, all connections ought to be setup during `addprocs`
rmprocs(workers())
addprocs_with_testenv(8; lazy=false)
def_count_conn()
@test sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) == 64

0 comments on commit 9b30e01

Please sign in to comment.