From 9b30e01995144e3591cdd0db173e3d1390250442 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Fri, 14 Jul 2017 20:59:52 +0530 Subject: [PATCH] support lazy all_to_all connection setups --- NEWS.md | 3 + base/distributed/cluster.jl | 96 ++++++++++++++++++++++------ base/distributed/managers.jl | 6 +- base/distributed/messages.jl | 1 + base/distributed/process_messages.jl | 12 +++- doc/src/manual/parallel-computing.md | 6 ++ test/distributed_exec.jl | 2 +- test/topology.jl | 42 ++++++++++++ 8 files changed, 145 insertions(+), 23 deletions(-) diff --git a/NEWS.md b/NEWS.md index c00a3e18b0e9b..a9f7983af2442 100644 --- a/NEWS.md +++ b/NEWS.md @@ -63,6 +63,9 @@ This section lists changes that do not have deprecation warnings. * The `Diagonal` type definition has changed from `Diagonal{T}` to `Diagonal{T,V<:AbstractVector{T}}` ([#22718]). + * Worker-worker connections are setup lazily for a `:all_to_all` topology. Use keyword + arg `lazy=false` to force all connections to be setup during a `addprocs` call. ([#22814]) + Library improvements -------------------- diff --git a/base/distributed/cluster.jl b/base/distributed/cluster.jl index f82314bab58e5..557186c1e074e 100644 --- a/base/distributed/cluster.jl +++ b/base/distributed/cluster.jl @@ -59,6 +59,7 @@ mutable struct Worker state::WorkerState c_state::Condition # wait for state changes ct_time::Float64 # creation time + conn_func::Nullable{Function} # Used to setup connections lazily r_stream::IO w_stream::IO @@ -82,12 +83,13 @@ mutable struct Worker w end - function Worker(id::Int) + Worker(id::Int) = Worker(id, Nullable{Function}()) + function Worker(id::Int, conn_func) @assert id > 0 if haskey(map_pid_wrkr, id) return map_pid_wrkr[id] end - w=new(id, [], [], false, W_CREATED, Condition(), time()) + w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func) register_worker(w) w end @@ -102,21 +104,56 @@ end function check_worker_state(w::Worker) if w.state == W_CREATED - if PGRP.topology == :all_to_all - # Since higher pids connect with lower pids, the remote worker - # may not have connected to us yet. Wait for some time. - timeout = worker_timeout() - (time() - w.ct_time) - timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - - @schedule (sleep(timeout); notify(w.c_state; all=true)) - wait(w.c_state) - w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + if !isclusterlazy() + if PGRP.topology == :all_to_all + # Since higher pids connect with lower pids, the remote worker + # may not have connected to us yet. Wait for some time. + wait_for_conn(w) + else + error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology)) + end else - error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology)) + w.ct_time = time() + if myid() > w.id + @schedule exec_conn_func(w) + else + # route request via node 1 + @schedule remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + end + wait_for_conn(w) end end end +exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)) +function exec_conn_func(w::Worker) + if isnull(w.conn_func) + return wait_for_conn(w) # Some other task may be trying to connect at the same time. + end + + try + f = get(w.conn_func) + w.conn_func = Nullable{Function}() + f() + catch e + w.conn_func = () -> throw(e) + rethrow(e) + end + nothing +end + +function wait_for_conn(w) + if w.state == W_CREATED + timeout = worker_timeout() - (time() - w.ct_time) + timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") + + @schedule (sleep(timeout); notify(w.c_state; all=true)) + wait(w.c_state) + w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + end + nothing +end + ## process group creation ## mutable struct LocalProcess @@ -340,6 +377,17 @@ function addprocs_locked(manager::ClusterManager; kwargs...) params = merge(default_addprocs_params(), AnyDict(kwargs)) topology(Symbol(params[:topology])) + if PGRP.topology != :all_to_all + params[:lazy] = false + end + + if isnull(PGRP.lazy) || nprocs() == 1 + PGRP.lazy = Nullable{Bool}(params[:lazy]) + elseif isclusterlazy() != params[:lazy] + throw(ErrorException(string("Active workers with lazy=", isclusterlazy(), + ". Cannot set lazy=", params[:lazy]))) + end + # References to launched workers, filled when each worker is fully initialized and # has connected to all nodes. launched_q = Int[] # Asynchronously filled by the launch method @@ -396,7 +444,8 @@ default_addprocs_params() = AnyDict( :dir => pwd(), :exename => joinpath(JULIA_HOME, julia_exename()), :exeflags => ``, - :enable_threaded_blas => false) + :enable_threaded_blas => false, + :lazy => true) function setup_launched_worker(manager, wconfig, launched_q) @@ -517,7 +566,7 @@ function create_worker(manager, wconfig) all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list) send_connection_hdr(w, true) - join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false)) + join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false), isclusterlazy()) send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message) @schedule manage(w.manager, w.id, w.config, :register) @@ -619,8 +668,9 @@ mutable struct ProcessGroup workers::Array{Any,1} refs::Dict # global references topology::Symbol + lazy::Nullable{Bool} - ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all) + ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, Nullable{Bool}()) end const PGRP = ProcessGroup([]) @@ -634,6 +684,14 @@ function topology(t) t end +function isclusterlazy() + if isnull(PGRP.lazy) + return false + else + return get(PGRP.lazy) + end +end + get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid)) get_bind_addr(w::LocalProcess) = LPROC.bind_addr function get_bind_addr(w::Worker) @@ -667,7 +725,7 @@ myid() = LPROC.id Get the number of available processes. """ function nprocs() - if myid() == 1 || PGRP.topology == :all_to_all + if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy()) n = length(PGRP.workers) # filter out workers in the process of being setup/shutdown. for jw in PGRP.workers @@ -698,7 +756,7 @@ end Returns a list of all process identifiers. """ function procs() - if myid() == 1 || PGRP.topology == :all_to_all + if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy()) # filter out workers in the process of being setup/shutdown. return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)] else @@ -707,7 +765,7 @@ function procs() end function id_in_procs(id) # faster version of `id in procs()` - if myid() == 1 || PGRP.topology == :all_to_all + if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy()) for x in PGRP.workers if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state == W_CONNECTED) return true @@ -903,7 +961,7 @@ function deregister_worker(pg, pid) if myid() == 1 && isdefined(w, :config) # Notify the cluster manager of this workers death manage(w.manager, w.id, w.config, :deregister) - if PGRP.topology != :all_to_all + if PGRP.topology != :all_to_all || isclusterlazy() for rpid in workers() try remote_do(deregister_worker, rpid, pid) diff --git a/base/distributed/managers.jl b/base/distributed/managers.jl index f14f46829f312..b3f2cb6222db5 100644 --- a/base/distributed/managers.jl +++ b/base/distributed/managers.jl @@ -100,6 +100,10 @@ Keyword arguments: A worker with a cluster manager identity `ident` will connect to all workers specified in `connect_idents`. +* `lazy`: Applicable only with `topology=:all_to_all`. If `true`, worker-worker connections + are setup lazily, i.e., they are setup at the first instance of a remote call between + workers. Default is true. + Environment variables : @@ -302,7 +306,7 @@ addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...) Launches workers using the in-built `LocalManager` which only launches workers on the local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4 processes on the local machine. If `restrict` is `true`, binding is restricted to -`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, and +`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, `lazy` and `enable_threaded_blas` have the same effect as documented for `addprocs(machines)`. """ function addprocs(np::Integer; restrict=true, kwargs...) diff --git a/base/distributed/messages.jl b/base/distributed/messages.jl index df5cfd0255aa8..12f347fa493f0 100644 --- a/base/distributed/messages.jl +++ b/base/distributed/messages.jl @@ -68,6 +68,7 @@ struct JoinPGRPMsg <: AbstractMsg other_workers::Array topology::Symbol enable_threaded_blas::Bool + lazy::Bool end struct JoinCompleteMsg <: AbstractMsg cpu_cores::Int diff --git a/base/distributed/process_messages.jl b/base/distributed/process_messages.jl index 1eba6eafd496d..a86799c5eae33 100644 --- a/base/distributed/process_messages.jl +++ b/base/distributed/process_messages.jl @@ -307,14 +307,22 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) disable_threaded_libs() end + lazy = msg.lazy + PGRP.lazy = Nullable{Bool}(lazy) + wait_tasks = Task[] for (connect_at, rpid) in msg.other_workers wconfig = WorkerConfig() wconfig.connect_at = connect_at let rpid=rpid, wconfig=wconfig - t = @async connect_to_peer(cluster_manager, rpid, wconfig) - push!(wait_tasks, t) + if lazy + # The constructor registers the object with a global registry. + Worker(rpid, Nullable{Function}(()->connect_to_peer(cluster_manager, rpid, wconfig))) + else + t = @async connect_to_peer(cluster_manager, rpid, wconfig) + push!(wait_tasks, t) + end end end diff --git a/doc/src/manual/parallel-computing.md b/doc/src/manual/parallel-computing.md index e8f93351f0771..148706770ae52 100644 --- a/doc/src/manual/parallel-computing.md +++ b/doc/src/manual/parallel-computing.md @@ -1298,6 +1298,12 @@ connected to each other: fields `ident` and `connect_idents` in `WorkerConfig`. A worker with a cluster-manager-provided identity `ident` will connect to all workers specified in `connect_idents`. +Keyword argument `lazy=true|false` only affects `topology` option `:all_to_all`. If `true`, the cluster +starts off with the master connected to all workers. Specific worker-worker connections are established +at the first remote invocation between two workers. This helps in reducing initial resources allocated for +intra-cluster communication. Connections are setup depending on the runtime requirements of a parallel +program. Default value for `lazy` is `true`. + Currently, sending a message between unconnected workers results in an error. This behaviour, as with the functionality and interface, should be considered experimental in nature and may change in future releases. diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index e681a356dc925..969e2ab3c71a3 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -10,7 +10,7 @@ include("testenv.jl") 1 end -addprocs_with_testenv(4) +addprocs_with_testenv(4; lazy=false) @test nprocs() == 5 function reuseport_tests() diff --git a/test/topology.jl b/test/topology.jl index 52cf1bfa29be3..a0806428bc48c 100644 --- a/test/topology.jl +++ b/test/topology.jl @@ -90,3 +90,45 @@ for p1 in workers() end remove_workers_and_test() + +# test `lazy` connection setup +function def_count_conn() + @everywhere function count_connected_workers() + count(x -> isa(x, Base.Distributed.Worker) && isdefined(x, :r_stream) && isopen(x.r_stream), + Base.Distributed.PGRP.workers) + end +end + +addprocs_with_testenv(8) +def_count_conn() + +# Test for 10 random combinations +wl = workers() +combinations = [] +while length(combinations) < 10 + from = rand(wl) + to = rand(wl) + if from == to || ((from,to) in combinations) || ((to,from) in combinations) + continue + else + push!(combinations, (from,to)) + end +end + +# Initially only master-slave connections ought to be setup +expected_num_conns = 8 +num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) +@test num_conns == expected_num_conns + +for (i, (from,to)) in enumerate(combinations) + remotecall_wait(topid->remotecall_fetch(myid, topid), from, to) + expected_num_conns += 2 # one connection endpoint on both from and to + num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) + @test num_conns == expected_num_conns +end + +# With lazy=false, all connections ought to be setup during `addprocs` +rmprocs(workers()) +addprocs_with_testenv(8; lazy=false) +def_count_conn() +@test sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) == 64