Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced ClusterManager. Externalized support for new cluster types. #3649

Merged
merged 2 commits into from
Jul 9, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,13 @@ function amap(f::Function, A::AbstractArray, axis::Integer)

return R
end

function addprocs_scyld(np::Integer)
error("Base.addprocs_scyld is deprecated - add package ClusterManagers and then use ClusterManagers.addprocs_scyld instead.")
end
export addprocs_scyld

function addprocs_sge(np::Integer)
error("Base.addprocs_sge is deprecated - add package ClusterManagers and then use ClusterManagers.addprocs_sge instead.")
end
export addprocs_sge
3 changes: 1 addition & 2 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,6 @@ export

# multiprocessing
addprocs,
addprocs_scyld,
addprocs_sge,
fetch,
isready,
yield,
Expand All @@ -1110,6 +1108,7 @@ export
remotecall_wait,
take,
wait,
ClusterManager,

# distributed arrays
distribute,
Expand Down
281 changes: 129 additions & 152 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
## julia starts with one process, and processors can be added using:
## addprocs(n) using exec
## addprocs({"host1","host2",...}) using remote execution
## addprocs_scyld(n) using Scyld ClusterWare
## addprocs_sge(n) using Sun Grid Engine batch queue
##
## remotecall(w, func, args...) -
## tell a worker to call a function on the given arguments.
Expand Down Expand Up @@ -210,7 +208,7 @@ function add_workers(pg::ProcessGroup, w::Array{Any,1})
for i=1:length(w)
send_msg_now(w[i], :join_pgrp, w[i].id, all_locs)
end
:ok
[w[i].id for i in 1:length(w)]
end

myid() = LPROC.id
Expand Down Expand Up @@ -860,7 +858,7 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
end)
end

function disable_parallel_libs()
function disable_threaded_libs()
blas_set_num_threads(1)
end

Expand All @@ -879,7 +877,7 @@ function start_worker(out::IO)
# close STDIN; workers will not use it
#close(STDIN)

disable_parallel_libs()
disable_threaded_libs()

ccall(:jl_install_sigint_handler, Void, ())

Expand All @@ -896,42 +894,86 @@ function start_worker(out::IO)
exit(0)
end


function start_remote_workers(machines, cmds, tunnel=false, sshflags=``)
n = length(cmds)
outs = cell(n)
function start_cluster_workers(n, config)
w = cell(n)
for i=1:n
outs[i],_ = readsfrom(cmds[i])
outs[i].line_buffered = true
cman = config[:cman]

# Get the cluster manager to launch the instance
(insttype, instances) = cman.launch_cb(n, config)


if insttype == :io_only
read_cb_response(inst) =
begin
(host, port) = read_worker_host_port(inst)
inst, host, port
end
elseif insttype == :io_host
read_cb_response(inst) =
begin
io = inst[1]
(_, port) = read_worker_host_port(io)
io, inst[2], port
end
elseif insttype == :io_host_port
read_cb_response(inst) = (inst[1], inst[2], inst[3])
elseif insttype == :host_port
read_cb_response(inst) = (nothing, inst[1], inst[2])
elseif insttype == :cmd
read_cb_response(inst) =
begin
io,_ = readsfrom(detach(inst))
io.line_buffered = true
(host, port) = read_worker_host_port(io)
io, host, port
end
else
error("Unsupported format from Cluster Manager callback")
end

for i=1:n
local hostname::String, port::Int16
stream = outs[i]
stream.line_buffered = true
while true
conninfo = readline(stream)
private_hostname, port = parse_connection_info(conninfo)
if private_hostname != ""
break
end
(io, host, port) = read_cb_response(instances[i])
w[i] = create_worker(host, port, io, config)
end
w
end

function read_worker_host_port (io::IO)
io.line_buffered = true
while true
conninfo = readline(io)
private_hostname, port = parse_connection_info(conninfo)
if private_hostname != ""
return private_hostname, port
end

s = split(machines[i],'@')
if length(s) > 1
user = s[1]
hostname = s[2]
else
end
end

function create_worker(hostname, port, stream, config)
tunnel = config[:tunnel]

s = split(hostname,'@')
if length(s) > 1
user = s[1]
hostname = s[2]
else
if haskey(ENV, "USER")
user = ENV["USER"]
hostname = s[1]
end

if tunnel
w[i] = Worker(hostname, port, user, sshflags)
else
w[i] = Worker(hostname, port)
elseif tunnel
error("USER must be specified either in the environment or as part of the hostname when tunnel option is used.")
end
let wrker = w[i]
hostname = s[1]
end

if tunnel
sshflags = config[:sshflags]
w = Worker(hostname, port, user, sshflags)
else
w = Worker(hostname, port)
end

if isa(stream, AsyncStream)
let wrker = w
# redirect console output from workers to the client's stdout:
start_reading(stream,function(stream::AsyncStream,nread::Int)
if nread>0
Expand All @@ -950,6 +992,7 @@ function start_remote_workers(machines, cmds, tunnel=false, sshflags=``)
w
end


function parse_connection_info(str)
m = match(r"^julia_worker:(\d+)#(.*)", str)
if m != nothing
Expand Down Expand Up @@ -977,135 +1020,69 @@ function ssh_tunnel(user, host, port, sshflags)
localp
end

#function worker_ssh_cmd(host, key)
# `ssh -i $key -n $host "sh -l -c \"cd $JULIA_HOME && ./julia-release-basic --worker\""`
#end

# start and connect to processes via SSH.
# optionally through an SSH tunnel.
# the tunnel is only used from the head (process 1); the nodes are assumed
# to be mutually reachable without a tunnel, as is often the case in a cluster.
function addprocs(machines::AbstractVector;
tunnel=false, dir=JULIA_HOME, exename="./julia-release-basic", sshflags::Cmd=``)
add_workers(PGRP,
start_remote_workers(machines,
map(m->detach(`ssh -n $sshflags $m "sh -l -c \"cd $dir && $exename --worker\""`),
machines),
tunnel, sshflags))
end

#function addprocs_ssh(machines, keys)
# if !(isa(keys, Array)) && isa(machines,Array)
# key = keys
# keys = [ key for x = 1:length(machines)]
# cmdargs = { {machines[x],keys[x]} for x = 1:length(machines)}
# else
# cmdargs = {{machines,keys}}
# end #if/else
# add_workers(PGRP, start_remote_workers(machines, map(x->worker_ssh_cmd(x[1],x[2]), cmdargs)))
#end
abstract ClusterManager

worker_local_cmd() = `$JULIA_HOME/julia-release-basic --bind-to $bind_addr --worker`

function addprocs(np::Integer)
disable_parallel_libs()
add_workers(PGRP, start_remote_workers({ "localhost" for i=1:np },
{ worker_local_cmd() for i=1:np }))
end

function start_scyld_workers(np::Integer)
home = JULIA_HOME
beomap_cmd = `beomap --no-local --np $np`
out,beomap_proc = readsfrom(beomap_cmd)
wait(beomap_proc)
if !success(beomap_proc)
error("node availability inaccessible (could not run beomap)")
end
nodes = split(chomp(readline(out)),':')
outs = cell(np)
for (i,node) in enumerate(nodes)
cmd = detach(`bpsh $node sh -l -c "cd $home && ./julia-release-basic --worker"`)
outs[i],_ = readsfrom(cmd)
outs[i].line_buffered = true
end
workers = cell(np)
for (i,stream) in enumerate(outs)
local hostname::String, port::Int16
stream.line_buffered = true
while true
conninfo = readline(stream)
hostname, port = parse_connection_info(conninfo)
if hostname != ""
break
end
end
workers[i] = Worker(hostname, port)
let worker = workers[i]
# redirect console output from workers to the client's stdout:
start_reading(stream,function(stream::AsyncStream,nread::Int)
if(nread>0)
try
line = readbytes(stream.buffer, nread)
print("\tFrom worker $(worker.id):\t",line)
catch err
println(STDERR,"\tError parsing reply from worker $(worker.id):\t",err)
return false
end
end
true
end)
function launch_procs(n::Integer, config::Dict)
dir = config[:dir]
exename = config[:exename]
exeflags = config[:exeflags]

cman = config[:cman]
if cman.ssh
sshflags = config[:sshflags]
outs=cell(n)
for i in 1:n
m = cman.machines[i]
cmd = detach(`ssh -n $sshflags $m "sh -l -c \"cd $dir && $exename $exeflags\""`)
io,_ = readsfrom(cmd)
io.line_buffered = true
local port::Int16
(_, port) = read_worker_host_port (io)

# We ignore the hostname printed by the worker, since the worker may be behind a NATed interface,
# we just use the hostname specified by the caller as part of the machine name
outs[i] = (io, m, port)
end
return (:io_host_port, outs)

else
worker_local_cmd = `$(dir)/$(exename) --bind-to $bind_addr $exeflags`
return (:cmd, {worker_local_cmd for i in 1:n})
end
workers
end

function addprocs_scyld(np::Integer)
disable_parallel_libs()
add_workers(PGRP, start_scyld_workers(np))
immutable RegularCluster <: ClusterManager
launch_cb::Function
ssh::Bool
machines

RegularCluster(; ssh=false, machines=[]) = new(launch_procs, ssh, machines)
end

function start_sge_workers(n)
home = JULIA_HOME
sgedir = joinpath(pwd(),"SGE")
run(`mkdir -p $sgedir`)
qsub_cmd = `echo $home/julia-release-basic --worker` |> `qsub -N JULIA -terse -cwd -j y -o $sgedir -t 1:$n`
out,qsub_proc = readsfrom(qsub_cmd)
if !success(qsub_proc)
error("batch queue not available (could not run qsub)")
end
id = chomp(split(readline(out),'.')[1])
println("job id is $id")
print("waiting for job to start");
workers = cell(n)
for i=1:n
# wait for each output stream file to get created
fname = "$sgedir/JULIA.o$(id).$(i)"
local fl, hostname, port
fexists = false
sleep(0.5)
while !fexists
try
fl = open(fname)
try
conninfo = readline(fl)
hostname, port = parse_connection_info(conninfo)
finally
close(fl)
end
fexists = (hostname != "")
catch
print(".");
sleep(0.5)
end
# start and connect to processes via SSH.
# optionally through an SSH tunnel.
# the tunnel is only used from the head (process 1); the nodes are assumed
# to be mutually reachable without a tunnel, as is often the case in a cluster.
function addprocs(instances::Union(AbstractVector, Integer);
tunnel=false, dir=JULIA_HOME, exename="./julia-release-basic", sshflags::Cmd=``, cman=nothing)

config={:dir=>dir, :exename=>exename, :exeflags=>` --worker `, :tunnel=>tunnel, :sshflags=>sshflags}
disable_threaded_libs()

if isa(instances, AbstractVector) && (cman == nothing)
config[:cman] = RegularCluster(ssh=true, machines=instances)
return add_workers(PGRP, start_cluster_workers(length(instances), config))
else
if isa(cman, ClusterManager)
config[:cman] = cman
else
config[:cman] = RegularCluster()
end
#print("hostname=$hostname, port=$port\n")
workers[i] = Worker(hostname, port)
return add_workers(PGRP, start_cluster_workers(instances, config))
end
print("\n")
workers
end

addprocs_sge(n) = add_workers(PGRP, start_sge_workers(n))

## higher-level functions: spawn, pmap, pfor, etc. ##

Expand Down